kalenislims/lims/report/lims_report.py
Sebastián Marró 35f6cdea73 Initial commit
2017-10-07 21:23:22 -03:00

2066 lines
83 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of lims module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.
try:
import cStringIO as StringIO
except ImportError:
import StringIO
import cups
from datetime import datetime
from trytond.model import ModelView, fields
from trytond.wizard import Wizard, StateTransition, StateView, StateAction, \
StateReport, Button
from trytond.report import Report
from trytond.pool import Pool
from trytond.transaction import Transaction
from trytond.rpc import RPC
__all__ = ['LimsPrintAcknowledgmentOfReceipt', 'LimsAcknowledgmentOfReceipt',
'LimsEntryDetail', 'LimsEntryLabels', 'LimsEntryLabelsPrinter',
'LimsSampleLabelsPrinter', 'LimsPrintControlChart',
'LimsControlChartReport', 'LimsResultReport', 'LimsGlobalResultReport',
'LimsResultReportTranscription', 'LimsCountersampleStoragePrintStart',
'LimsCountersampleStoragePrint', 'LimsCountersampleStorageReport',
'LimsCountersampleDischargePrintStart', 'LimsCountersampleDischargePrint',
'LimsCountersampleDischargeReport', 'PrintAnalysisPendingInformStart',
'PrintAnalysisPendingInform', 'AnalysisPendingInform',
'PrintAnalysisCheckedPendingInformStart',
'PrintAnalysisCheckedPendingInform', 'AnalysisCheckedPendingInform']
def get_print_date():
Company = Pool().get('company.company')
date = datetime.now()
company_id = Transaction().context.get('company')
if company_id:
date = Company(company_id).convert_timezone_datetime(date)
return date
class LimsPrintAcknowledgmentOfReceipt(Wizard):
'Print Acknowledgment of Samples Receipt'
__name__ = 'lims.entry.acknowledgment.print'
start = StateTransition()
print_ = StateReport('lims.entry.acknowledgment.report')
def transition_start(self):
LimsEntry = Pool().get('lims.entry')
data_ids = Transaction().context['active_ids'][:]
while len(data_ids) > 0:
data_id = data_ids.pop()
entry = LimsEntry(data_id)
if entry.state == 'ongoing':
printable = False
for sample in entry.samples:
if not sample.fractions:
break
for fraction in sample.fractions:
if (fraction.confirmed and fraction.services
and not fraction.cie_fraction_type):
printable = True
break
if printable:
return 'print_'
else:
Transaction().context['active_ids'].remove(data_id)
else:
Transaction().context['active_ids'].remove(data_id)
return 'end'
def do_print_(self, action):
data = {}
data['id'] = Transaction().context['active_ids'].pop()
data['ids'] = [data['id']]
return action, data
def transition_print_(self):
if Transaction().context.get('active_ids'):
return 'start'
return 'end'
class LimsAcknowledgmentOfReceipt(Report):
'Acknowledgment of Samples Receipt'
__name__ = 'lims.entry.acknowledgment.report'
@classmethod
def __setup__(cls):
super(LimsAcknowledgmentOfReceipt, cls).__setup__()
cls.__rpc__['execute'] = RPC(False)
@classmethod
def execute(cls, ids, data):
LimsEntry = Pool().get('lims.entry')
result = super(LimsAcknowledgmentOfReceipt, cls).execute(ids, data)
entry = LimsEntry(ids[0])
if entry.ack_report_cache:
result = (entry.ack_report_format,
entry.ack_report_cache) + result[2:]
else:
entry.ack_report_format, entry.ack_report_cache = result[:2]
entry.save()
return result
@classmethod
def get_context(cls, records, data):
pool = Pool()
Company = pool.get('company.company')
LimsService = pool.get('lims.service')
LimsEntry = pool.get('lims.entry')
report_context = super(LimsAcknowledgmentOfReceipt, cls).get_context(
records, data)
if 'id' in data:
entry = LimsEntry(data['id'])
else:
entry = records[0]
company = Company(Transaction().context.get('company'))
report_context['company'] = company
samples = []
record = {
'party': entry.party.rec_name,
'samples': samples,
}
for sample in entry.samples:
if not sample.fractions:
continue
confirmed = False
for fraction in sample.fractions:
if (fraction.confirmed and fraction.services
and not fraction.cie_fraction_type):
confirmed = True
break
if not confirmed:
continue
services = {}
sample_data = {
'label': sample.label,
'producer': (sample.producer.rec_name if sample.producer
else ''),
'date': sample.date,
'product_type': sample.product_type.rec_name,
'client_description': sample.sample_client_description,
'number': sample.number,
'services': services,
}
samples.append(sample_data)
services_obj = LimsService.search([
('sample', '=', sample.id),
('fraction.confirmed', '=', True),
('fraction.cie_fraction_type', '=', False),
])
for service in services_obj:
if (service.analysis.type == 'analysis' and not
cls.get_analysis_reportable(sample.product_type,
sample.matrix, service.analysis, service.method)):
continue
if service.analysis.id not in services:
s_methods = {}
services[service.analysis.id] = {
'code': service.analysis.code,
'name': service.analysis.description,
'methods': s_methods,
}
if service.analysis.type == 'analysis':
s_methods[service.method.id] = {
'method': service.method,
'analysis': [],
'enac': False,
}
acredited = cls.get_accreditation(
sample.product_type,
sample.matrix,
service.analysis,
service.method)
if acredited:
s_methods[service.method.id]['enac'] = True
else:
ia_methods = cls.get_included_analysis(
service.analysis.id, service.fraction.id)
for ia in ia_methods:
if ia['method_id'] not in s_methods:
s_methods[ia['method_id']] = {
'method': ia['method'],
'analysis': [],
'enac': False,
}
s_methods[ia['method_id']]['analysis'].append({
'acredited': ia['acredited'],
'analysis': ia['analysis'],
})
if (not s_methods[ia['method_id']]['enac'] and
ia['acredited']):
s_methods[ia['method_id']]['enac'] = True
for v in s_methods.itervalues():
if v['enac']:
v['enac_label'] = (LimsEntry.raise_user_error(
'enac_acredited', raise_exception=False))
else:
v['enac_label'] = ''
sorted_analysis = sorted(v['analysis'],
key=lambda x: x['analysis'].description)
v['analysis'] = sorted_analysis
report_context['records'] = [record]
return report_context
@classmethod
def get_included_analysis(cls, analysis_id, fraction_id):
pool = Pool()
LimsAnalysisIncluded = pool.get('lims.analysis.included')
LimsEntryDetailAnalysis = pool.get('lims.entry.detail.analysis')
childs = []
included_analysis = LimsAnalysisIncluded.search([
('analysis', '=', analysis_id),
])
for analysis in included_analysis:
if analysis.included_analysis.type == 'analysis':
analysis_detail = LimsEntryDetailAnalysis.search([
('fraction', '=', fraction_id),
('analysis', '=', analysis.included_analysis.id),
])
if analysis_detail:
analysis_detail = analysis_detail[0]
if cls.get_analysis_reportable(
analysis_detail.sample.product_type,
analysis_detail.sample.matrix,
analysis_detail.analysis,
analysis_detail.method):
childs.append({
'method_id': analysis_detail.method.id,
'method': analysis_detail.method,
'analysis': analysis.included_analysis,
'acredited': cls.get_accreditation(
analysis_detail.sample.product_type,
analysis_detail.sample.matrix,
analysis_detail.analysis,
analysis_detail.method),
})
childs.extend(cls.get_included_analysis(
analysis.included_analysis.id, fraction_id))
return childs
@classmethod
def get_accreditation(cls, product_type, matrix, analysis, method):
pool = Pool()
LimsTypification = pool.get('lims.typification')
typifications = LimsTypification.search([
('product_type', '=', product_type),
('matrix', '=', matrix),
('analysis', '=', analysis),
('method', '=', method),
('valid', '=', True),
])
if typifications:
if typifications[0].technical_scope_versions:
for version in typifications[0].technical_scope_versions:
certification_type = (
version.technical_scope.certification_type)
if certification_type and certification_type.report:
return True
return False
@classmethod
def get_analysis_reportable(cls, product_type, matrix, analysis, method):
LimsTypification = Pool().get('lims.typification')
if analysis.behavior == 'additional':
return False
typifications = LimsTypification.search([
('product_type', '=', product_type),
('matrix', '=', matrix),
('analysis', '=', analysis),
('method', '=', method),
('valid', '=', True),
])
if typifications:
return typifications[0].report
return True
class LimsEntryDetail(Report):
'Entry Detail'
__name__ = 'lims.entry.detail.report'
@classmethod
def get_context(cls, records, data):
report_context = super(LimsEntryDetail, cls).get_context(records, data)
Company = Pool().get('company.company')
company = Company(Transaction().context.get('company'))
report_context['company'] = company
return report_context
class LimsEntryLabels(Report):
'Entry Labels'
__name__ = 'lims.entry.labels.report'
@classmethod
def get_context(cls, records, data):
report_context = super(LimsEntryLabels, cls).get_context(records, data)
labels = []
for entry in records:
for sample in entry.samples:
for fraction in sample.fractions:
labels.append(fraction)
report_context['labels'] = labels
return report_context
class LimsEntryLabelsPrinter(Wizard):
'Entry Labels Printer'
__name__ = 'lims.entry.labels.printer.report'
start = StateTransition()
def transition_start(self):
pool = Pool()
User = pool.get('res.user')
Entry = pool.get('lims.entry')
user = User(Transaction().user)
if not user.printer:
return 'end'
s = u'\n'
s += u'q750\n'
s += u'I8,A\n'
entry = Entry(Transaction().context.get('active_id'))
for sample in entry.samples:
for fraction in sample.fractions:
if (fraction.shared):
c = 'C'
else:
c = ''
if sample.restricted_entry:
f = 'F'
else:
f = ''
labelline = fraction.label
if len(labelline) > 52:
line1 = labelline[0:52]
line2 = labelline[52:105]
else:
line1 = fraction.label
line2 = ''
s += u'N\n'
s += u'A90,0,0,3,1,2,N,"' + fraction.number + '"\n'
s += (u'A320,0,0,3,1,2,N,"' +
fraction.storage_location.code + '"\n')
s += u'A90,40,0,1,1,2,N,"' + line1 + '"\n'
s += u'A90,65,0,1,1,2,N,"' + line2 + '"\n'
s += (u'A90,100,0,1,1,2,N,"' +
fraction.sample.product_type.description + ' - ' +
fraction.sample.matrix.description + '"\n')
s += u'B90,125,0,1,2,0,60,N,"' + fraction.number + '"\n'
s += u'A500,140,0,3,2,2,N,"' + c + ' ' + f + '"\n'
s += u'P' + str(fraction.packages_quantity) + '\n'
labels_to_print = open('labels', 'w')
labels_to_print.write(s.encode('cp1252', 'ignore'))
labels_to_print.close()
conn = cups.Connection()
conn.printFile(user.printer.name, 'labels', 'Labels', {})
return 'end'
class LimsSampleLabelsPrinter(Wizard):
'Sample Labels Printer'
__name__ = 'lims.sample.labels.printer.report'
start = StateTransition()
def transition_start(self):
pool = Pool()
User = pool.get('res.user')
Sample = pool.get('lims.sample')
user = User(Transaction().user)
if not user.printer:
return 'end'
s = u'\n'
s += u'q750\n'
s += u'I8,A\n'
for active_id in Transaction().context['active_ids']:
sample = Sample(active_id)
for fraction in sample.fractions:
if (fraction.shared):
c = 'C'
else:
c = ''
if sample.restricted_entry:
f = 'F'
else:
f = ''
labelline = fraction.label
if len(labelline) > 52:
line1 = labelline[0:52]
line2 = labelline[52:105]
else:
line1 = fraction.label
line2 = ''
s += u'N\n'
s += u'A90,0,0,3,1,2,N,"' + fraction.number + '"\n'
s += (u'A320,0,0,3,1,2,N,"' +
fraction.storage_location.code + '"\n')
s += u'A90,40,0,1,1,2,N,"' + line1 + '"\n'
s += u'A90,65,0,1,1,2,N,"' + line2 + '"\n'
s += (u'A90,100,0,1,1,2,N,"' +
fraction.sample.product_type.description + ' - ' +
fraction.sample.matrix.description + '"\n')
s += u'B90,125,0,1,2,0,60,N,"' + fraction.number + '"\n'
s += u'A500,140,0,3,2,2,N,"' + c + ' ' + f + '"\n'
s += u'P' + str(fraction.packages_quantity) + '\n'
labels_to_print = open('labels', 'w')
labels_to_print.write(s.encode('cp1252', 'ignore'))
labels_to_print.close()
conn = cups.Connection()
conn.printFile(user.printer.name, 'labels', 'Labels', {})
return 'end'
class LimsPrintControlChart(Wizard):
'Control Chart'
__name__ = 'lims.control_chart.print'
start = StateTransition()
print_ = StateAction('lims.report_control_chart')
def transition_start(self):
if Transaction().context.get('print_available'):
return 'print_'
return 'end'
def do_print_(self, action):
data = {}
data['id'] = Transaction().context['active_ids'].pop()
data['ids'] = [data['id']]
return action, data
def transition_print_(self):
if Transaction().context.get('active_ids'):
return 'start'
return 'end'
class LimsControlChartReport(Report):
'Control Chart'
__name__ = 'lims.control_chart.report'
@classmethod
def get_context(cls, reports, data):
report_context = super(LimsControlChartReport, cls).get_context(
reports, data)
pool = Pool()
Company = pool.get('company.company')
LimsControlTendency = pool.get('lims.control.tendency')
if 'id' in data:
tendency = LimsControlTendency(data['id'])
else:
tendency = LimsControlTendency(reports[0].id)
company = Company(Transaction().context.get('company'))
report_context['company'] = company
report_context['title'] = tendency.analysis.rec_name
records_limit = 100
details = []
count = 1
for detail in tendency.details:
report_context['number_' + str(count)] = count
report_context['result_' + str(count)] = detail.result
report_context['ucl_' + str(count)] = tendency.ucl
report_context['uwl_' + str(count)] = tendency.uwl
report_context['upl_' + str(count)] = tendency.upl
report_context['cl_' + str(count)] = tendency.cl
report_context['lwl_' + str(count)] = tendency.lwl
report_context['lcl_' + str(count)] = tendency.lcl
report_context['lpl_' + str(count)] = tendency.lpl
report_context['cv_' + str(count)] = tendency.cv
details.append({
'number': count,
'result': detail.result,
})
count += 1
if count > records_limit:
break
report_context['details'] = details
if count <= records_limit:
for x in range(count, records_limit + 1):
report_context['number_' + str(x)] = ''
report_context['result_' + str(x)] = ''
report_context['ucl_' + str(x)] = ''
report_context['uwl_' + str(x)] = ''
report_context['upl_' + str(x)] = ''
report_context['cl_' + str(x)] = ''
report_context['lwl_' + str(x)] = ''
report_context['lcl_' + str(x)] = ''
report_context['lpl_' + str(x)] = ''
report_context['cv_' + str(x)] = ''
return report_context
class LimsResultReport(Report):
'Results Report'
__name__ = 'lims.result_report'
@classmethod
def __setup__(cls):
super(LimsResultReport, cls).__setup__()
cls.__rpc__['execute'] = RPC(False)
@classmethod
def execute(cls, ids, data):
ResultsReport = Pool().get('lims.results_report.version.detail')
if len(ids) > 1:
ResultsReport.raise_user_error('multiple_reports')
results_report = ResultsReport(ids[0])
if results_report.state == 'annulled':
ResultsReport.raise_user_error('annulled_report')
if data is None:
data = {}
current_data = data.copy()
current_data['alt_lang'] = None
result_orig = super(LimsResultReport, cls).execute(ids, current_data)
current_data['alt_lang'] = 'en'
result_eng = super(LimsResultReport, cls).execute(ids, current_data)
save = False
if results_report.english_report:
if results_report.report_cache_eng:
result = (results_report.report_format_eng,
results_report.report_cache_eng) + result_eng[2:]
else:
result = result_eng
if ('english_report' in current_data and
current_data['english_report']):
results_report.report_format_eng, \
results_report.report_cache_eng = result_eng[:2]
save = True
else:
if results_report.report_cache:
result = (results_report.report_format,
results_report.report_cache) + result_orig[2:]
else:
result = result_orig
if ('english_report' in current_data and
not current_data['english_report']):
results_report.report_format, \
results_report.report_cache = result_orig[:2]
save = True
if save:
results_report.save()
return result
@classmethod
def get_context(cls, records, data):
pool = Pool()
Company = pool.get('company.company')
ResultsReport = pool.get('lims.results_report.version.detail')
ResultsReportLine = pool.get('lims.results_report.version.detail.line')
LimsNotebookLine = pool.get('lims.notebook.line')
LimsSample = pool.get('lims.sample')
LimsRangeType = pool.get('lims.range.type')
report_context = super(LimsResultReport, cls).get_context(
records, data)
if data.get('alt_lang'):
lang_code = data['alt_lang']
else:
lang_code = report_context['user'].language.code
report_context['alt_lang'] = lang_code
with Transaction().set_context(language=lang_code):
if 'id' in data:
report = ResultsReport(data['id'])
else:
report = ResultsReport(records[0].id)
company = Company(Transaction().context.get('company'))
report_context['company'] = company
report_context['number'] = "%s-%s" % (report.report_version.number,
report.number)
report_context['replace_number'] = ''
if report.number != '1':
with Transaction().set_context(language=lang_code):
prev_number = "%s-%s" % (report.report_version.number,
int(report.number) - 1)
report_context['replace_number'] = (
ResultsReport.raise_user_error('replace_number',
(prev_number,), raise_exception=False))
report_context['print_date'] = get_print_date()
report_context['party'] = (
report.report_version.results_report.party.rec_name)
party_address = (
report.report_version.results_report.party.address_get(
type='invoice'))
report_context['party_address'] = party_address.full_address.replace(
'\n', ' - ')
report_context['report_section'] = report.report_section
report_context['report_type'] = report.report_type
report_context['report_result_type'] = report.report_result_type
group_field = ('final_concentration' if
report.report_result_type == 'both' else
'initial_concentration')
report_context['signer'] = ''
report_context['signer_role'] = ''
report_context['signature'] = ''
if report.signer:
report_context['signer'] = report.signer.rec_name
report_context['signer_role'] = report.signer.role
if report.signer.signature:
report_context['signature'] = StringIO.StringIO(str(
report.signer.signature))
enac = False
enac_all_acredited = True
initial_unit = None
min_start_date = None
max_end_date = None
min_confirmation_date = None
obs_ql = False
obs_dl = False
obs_uncert = False
obs_result_range = False
report_context['range_title'] = ''
if report.report_result_type == 'result_range':
obs_result_range = True
with Transaction().set_context(language=lang_code):
range_type = LimsRangeType(report.resultrange_origin.id)
report_context['range_title'] = range_type.resultrange_title
obs_rm_c_f = False
tas_project = False
stp_project = False
stp_polisample_project = False
alcohol = False
dry_matter = False
comments = {}
fractions = {}
methods = {}
pnt_methods = {}
notebook_lines = ResultsReportLine.search([
('report_version_detail.report_version.id', '=',
report.report_version.id), ['OR',
('report_version_detail.id', '=', report.id),
('report_version_detail.valid', '=', True)],
], order=[('report_version_detail', 'ASC')])
if not notebook_lines:
ResultsReport.raise_user_error('empty_report')
with Transaction().set_context(language=lang_code):
reference_sample = LimsSample(
notebook_lines[0].notebook_line.fraction.sample.id)
if (report_context['report_section'] == 'rp' and
hasattr(reference_sample.entry, 'project_type') and
getattr(reference_sample.entry, 'project_type') ==
'study_plan'):
if report.report_type == 'normal' and not stp_project:
stp_project = True
if (report.report_type == 'polisample' and not
stp_polisample_project):
stp_polisample_project = True
if (hasattr(reference_sample.entry, 'project_type') and
getattr(reference_sample.entry, 'project_type') ==
'tas'):
tas_project = True
for line in notebook_lines:
with Transaction().set_context(language=lang_code):
t_line = LimsNotebookLine(line.notebook_line.id)
sample = LimsSample(line.notebook_line.fraction.sample.id)
key = t_line.fraction.id
if key not in fractions:
fractions[key] = {
'fraction': sample.number,
'client_description': (
sample.sample_client_description),
'number': sample.number,
'label': '(%s - %s)' % (sample.number,
sample.label),
'packages_quantity': sample.packages_quantity,
'package_type': (sample.package_type.description
if sample.package_type else ''),
'package_state': (
sample.package_state.description
if sample.package_state else ''),
'concentrations': {},
}
if (report.report_section == 'rp' and
report.report_type == 'polisample'):
fractions[key]['label'] = sample.label
if stp_polisample_project:
fractions[key]['stp_code'] = sample.entry.project.code
fractions[key]['stp_application_date'] = (
sample.application_date)
fractions[key]['stp_sampling_date'] = sample.sampling_date
fractions[key]['stp_zone'] = (sample.cultivation_zone
if sample.cultivation_zone else '')
fractions[key]['stp_after_application_days'] = (
sample.after_application_days)
fractions[key]['stp_treatment'] = sample.treatment
fractions[key]['stp_dosis'] = sample.dosis
fractions[key]['stp_repetition'] = sample.glp_repetitions
fractions[key]['stp_z_senasa_protocol'] = (
sample.z_senasa_protocol)
fractions[key]['stp_variety'] = (
sample.variety.description if
sample.variety else '')
record = {
'order': t_line.analysis.order or 9999,
'acredited': cls.get_accreditation(
t_line.notebook.product_type,
t_line.notebook.matrix,
t_line.analysis,
t_line.method),
'pnt': t_line.method.pnt,
}
record['analysis'] = cls.get_analysis(
report_context['report_section'], t_line, language=lang_code)
record['result'], obs_ql = cls.get_result(
report_context['report_section'], t_line, obs_ql,
language=lang_code)
record['rp_order'] = float(2)
try:
record['rp_order'] = float(record['result']) * -1
except (TypeError, ValueError):
try:
if str(record['result']).startswith('<'):
record['rp_order'] = float(1)
except UnicodeEncodeError:
pass
record['converted_result'], obs_ql = cls.get_converted_result(
report_context['report_section'],
report_context['report_result_type'], t_line, obs_ql,
language=lang_code)
record['initial_unit'], obs_dl, obs_uncert = cls.get_initial_unit(
report_context['report_section'],
report_context['report_result_type'], t_line, obs_dl,
obs_uncert, language=lang_code)
record['final_unit'], obs_dl, obs_uncert = cls.get_final_unit(
report_context['report_section'],
report_context['report_result_type'], t_line, obs_dl,
obs_uncert, language=lang_code)
record['detection_limit'] = cls.get_detection_limit(
report_context['report_section'],
report_context['report_result_type'], t_line,
language=lang_code)
record['reference'] = ''
if obs_result_range:
record['reference'] = unicode(cls.get_reference(range_type,
t_line, lang_code, report_context['report_section']))
if (t_line.rm_correction_formula and (record['result'] or
(record['converted_result'] and
report_context['report_result_type'] == 'both'))):
obs_rm_c_f = True
#no es necesario identificar mas el analito con (c)
#por eso quedan iguales linea 811 y 813
record['corrected'] = ''
else:
record['corrected'] = ''
conc = getattr(t_line, group_field)
if conc not in fractions[key]['concentrations']:
fractions[key]['concentrations'][conc] = []
fractions[key]['concentrations'][conc].append(record)
if not enac and record['acredited'] == 'True':
enac = True
if enac_all_acredited and record['acredited'] == 'False':
enac_all_acredited = False
if not initial_unit and t_line.initial_unit:
initial_unit = t_line.initial_unit.rec_name
entry_id = t_line.fraction.sample.entry.id
if entry_id not in comments:
comments[entry_id] = {
'report_comments': (
t_line.fraction.sample.entry.report_comments),
'samples': {},
}
if sample.id not in comments[entry_id]['samples']:
comments[entry_id]['samples'][sample.id] = (
sample.report_comments)
method_id = t_line.method.id
if method_id not in methods:
methods[method_id] = {
'method': t_line.method.name,
'analysis': [],
}
methods[method_id]['analysis'].append(record['analysis'])
if record['pnt'] not in pnt_methods:
pnt_methods[record['pnt']] = {
'pnt': record['pnt'],
'method': t_line.method.name,
}
if not reference_sample or sample.date < reference_sample.date:
with Transaction().set_context(language=lang_code):
reference_sample = LimsSample(sample.id)
if (not min_start_date or
t_line.start_date < min_start_date):
min_start_date = t_line.start_date
if (not max_end_date or
t_line.end_date > max_end_date):
max_end_date = t_line.end_date
if (not min_confirmation_date or
(t_line.analysis_detail.confirmation_date and
t_line.analysis_detail.confirmation_date <
min_confirmation_date)):
min_confirmation_date = (
t_line.analysis_detail.confirmation_date)
with Transaction().set_context(language=lang_code):
report_context['sample_producer'] = (
reference_sample.producer.rec_name if reference_sample.producer
else ResultsReport.raise_user_error('data_not_specified',
raise_exception=False))
report_context['sample_date'] = min_confirmation_date
report_context['min_start_date'] = min_start_date
report_context['max_end_date'] = max_end_date
report_context['sample_packages_quantity'] = (
reference_sample.packages_quantity)
report_context['sample_package_type'] = (
reference_sample.package_type.description
if reference_sample.package_type else '')
report_context['sample_package_state'] = (
reference_sample.package_state.description
if reference_sample.package_state else '')
if report.report_type == 'normal':
report_context['sample_label'] = (
reference_sample.label)
report_context['sample_client_description'] = (
reference_sample.sample_client_description)
report_context['sample_number'] = reference_sample.number
if report_context['report_section'] == 'for':
report_context['sample_prodct_type'] = (
reference_sample.product_type.description)
report_context['sample_matrix'] = (
reference_sample.matrix.description)
if tas_project:
report_context['tas_code'] = reference_sample.entry.project.code
if stp_project:
report_context['stp_code'] = reference_sample.entry.project.code
report_context['stp_application_date'] = (
reference_sample.application_date)
report_context['stp_sampling_date'] = (
reference_sample.sampling_date)
report_context['stp_zone'] = (reference_sample.cultivation_zone
if reference_sample.cultivation_zone else '')
report_context['stp_after_application_days'] = (
reference_sample.after_application_days)
report_context['stp_treatment'] = reference_sample.treatment
report_context['stp_dosis'] = reference_sample.dosis
report_context['stp_repetition'] = reference_sample.glp_repetitions
report_context['stp_z_senasa_protocol'] = (
reference_sample.z_senasa_protocol)
report_context['stp_variety'] = (
reference_sample.variety.description if
reference_sample.variety else '')
if stp_polisample_project:
report_context['stp_code'] = reference_sample.entry.project.code
report_context['tas_project'] = 'True' if tas_project else 'False'
report_context['stp_project'] = 'True' if stp_project else 'False'
report_context['stp_polisample_project'] = ('True' if
stp_polisample_project else 'False')
if reference_sample.product_type.code == 'VINO':
alcohol = True
if (report_context['report_section'] in ('amb', 'sq') and
reference_sample.matrix.code in ('SUELO', 'LODO')):
dry_matter = True
sorted_fractions = sorted(fractions.values(),
key=lambda x: x['fraction'])
with Transaction().set_context(language=lang_code):
for fraction in sorted_fractions:
for conc, lines in fraction['concentrations'].iteritems():
if report_context['report_section'] == 'rp':
sorted_lines = sorted(lines, key=lambda x: (
x['rp_order'], x['analysis']))
else:
sorted_lines = sorted(lines, key=lambda x: (
x['order'], x['analysis']))
fraction['concentrations'][conc] = {
'label': '',
'unit_label': '',
'lines': sorted_lines,
}
conc_is_numeric = True
try:
numeric_conc = float(conc)
except (TypeError, ValueError):
conc_is_numeric = False
hide_concentration_label = (
report_context['report_section'] in ('amb', 'sq')
and report_context['report_result_type'] == 'both')
if conc and conc != '-' and not hide_concentration_label:
if conc == 'Muestra Recibida':
fraction['concentrations'][conc]['label'] = (
ResultsReport.raise_user_error(
'concentration_label_1',
raise_exception=False))
elif conc_is_numeric and numeric_conc < 100:
fraction['concentrations'][conc]['label'] = (
ResultsReport.raise_user_error(
'concentration_label_2', (conc,),
raise_exception=False))
else:
fraction['concentrations'][conc]['label'] = (
ResultsReport.raise_user_error(
'concentration_label_3', (conc,),
raise_exception=False))
show_unit_label = False
for line in sorted_lines:
if line['converted_result']:
show_unit_label = True
break
if show_unit_label:
if dry_matter:
fraction['concentrations'][conc][
'unit_label'] = (
ResultsReport.raise_user_error(
'final_unit_label_4',
raise_exception=False))
else:
if conc_is_numeric:
if alcohol:
fraction['concentrations'][conc][
'unit_label'] = (
ResultsReport.raise_user_error(
'final_unit_label_1', (conc,),
raise_exception=False))
else:
fraction['concentrations'][conc][
'unit_label'] = (
ResultsReport.raise_user_error(
'final_unit_label_3', (conc,),
raise_exception=False))
else:
fraction['concentrations'][conc][
'unit_label'] = (
ResultsReport.raise_user_error(
'final_unit_label_2', (conc,),
raise_exception=False))
report_context['fractions'] = sorted_fractions
report_context['methods'] = []
for method in methods.itervalues():
concat_lines = ', '.join(list(set(method['analysis'])))
method['analysis'] = concat_lines
report_context['methods'].append(method)
report_context['pnt_methods'] = [m for m in pnt_methods.itervalues()]
report_context['enac'] = 'True' if enac else 'False'
if enac:
with Transaction().set_context(language=lang_code):
if enac_all_acredited:
report_context['enac_label'] = (
ResultsReport.raise_user_error(
'enac_all_acredited', raise_exception=False))
else:
report_context['enac_label'] = (
ResultsReport.raise_user_error(
'enac_acredited', raise_exception=False))
else:
report_context['enac_label'] = ''
report_context['initial_unit'] = initial_unit
report_context['comments'] = ''
for entry_comment in comments.itervalues():
if entry_comment['report_comments']:
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += entry_comment['report_comments']
for sample_comment in entry_comment['samples'].itervalues():
if sample_comment:
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += sample_comment
if report.comments:
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += report.comments
if obs_ql and report_context['report_section']:
with Transaction().set_context(language=lang_code):
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += (
ResultsReport.raise_user_error('obs_ql',
raise_exception=False))
if obs_dl and report_context['report_section']:
with Transaction().set_context(language=lang_code):
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += (
ResultsReport.raise_user_error('obs_dl',
raise_exception=False))
if obs_uncert:
with Transaction().set_context(language=lang_code):
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += (
ResultsReport.raise_user_error('obs_uncert',
raise_exception=False))
if obs_result_range and range_type.resultrange_comments:
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += range_type.resultrange_comments
if obs_rm_c_f:
with Transaction().set_context(language=lang_code):
if report_context['comments']:
report_context['comments'] += '\n'
report_context['comments'] += (
ResultsReport.raise_user_error('obs_rm_c_f',
raise_exception=False))
return report_context
@classmethod
def get_accreditation(cls, product_type, matrix, analysis, method):
pool = Pool()
LimsTypification = pool.get('lims.typification')
typifications = LimsTypification.search([
('product_type', '=', product_type),
('matrix', '=', matrix),
('analysis', '=', analysis),
('method', '=', method),
('valid', '=', True),
])
if typifications:
if typifications[0].technical_scope_versions:
for version in typifications[0].technical_scope_versions:
certification_type = (
version.technical_scope.certification_type)
if certification_type and certification_type.report:
return 'True'
return 'False'
@classmethod
def get_analysis(cls, report_section, notebook_line, language):
pool = Pool()
LimsAnalysis = pool.get('lims.analysis')
with Transaction().set_context(language=language):
analysis = LimsAnalysis(notebook_line.analysis.id)
res = analysis.description
if report_section == 'mi':
if analysis.gender_species:
res = analysis.gender_species
return res
@classmethod
def get_result(cls, report_section, notebook_line, obs_ql, language):
pool = Pool()
ResultsReport = pool.get('lims.results_report.version.detail')
literal_result = notebook_line.literal_result
result = notebook_line.result
decimals = notebook_line.decimals
result_modifier = notebook_line.result_modifier
with Transaction().set_context(language=language):
res = ''
if report_section in ('amb', 'for', 'rp', 'sq'):
if literal_result:
res = literal_result
else:
if result:
res = round(float(result), decimals)
if decimals == 0:
res = int(res)
else:
res = ''
if result_modifier == 'eq':
res = res
elif result_modifier == 'low':
res = ResultsReport.raise_user_error(
'quantification_limit', (res,),
raise_exception=False)
obs_ql = True
elif result_modifier == 'nd':
res = ResultsReport.raise_user_error('nd',
raise_exception=False)
elif result_modifier == 'ni':
res = ''
elif result_modifier == 'pos':
res = ResultsReport.raise_user_error('pos',
raise_exception=False)
elif result_modifier == 'neg':
res = ResultsReport.raise_user_error('neg',
raise_exception=False)
elif result_modifier == 'pre':
res = ResultsReport.raise_user_error('pre',
raise_exception=False)
elif result_modifier == 'abs':
res = ResultsReport.raise_user_error('abs',
raise_exception=False)
else:
res = result_modifier
elif report_section == 'mi':
if literal_result:
res = literal_result
else:
if result:
res = round(float(result), decimals)
if decimals == 0:
res = int(res)
else:
res = ''
if result_modifier == 'eq':
res = res
elif result_modifier == 'low':
res = '< %s' % res
elif result_modifier == 'nd':
res = ResultsReport.raise_user_error('nd',
raise_exception=False)
elif result_modifier == 'pos':
res = ResultsReport.raise_user_error('pos',
raise_exception=False)
elif result_modifier == 'neg':
res = ResultsReport.raise_user_error('neg',
raise_exception=False)
elif result_modifier == 'pre':
res = ResultsReport.raise_user_error('pre',
raise_exception=False)
elif result_modifier == 'abs':
res = ResultsReport.raise_user_error('abs',
raise_exception=False)
return res, obs_ql
@classmethod
def get_converted_result(cls, report_section, report_result_type,
notebook_line, obs_ql, language):
pool = Pool()
ResultsReport = pool.get('lims.results_report.version.detail')
if (report_section in ('for', 'mi', 'rp') or
report_result_type != 'both'):
return '', obs_ql
literal_result = notebook_line.literal_result
converted_result = notebook_line.converted_result
analysis = notebook_line.analysis.code
decimals = notebook_line.decimals
converted_result_modifier = notebook_line.converted_result_modifier
with Transaction().set_context(language=language):
res = ''
if analysis != '0001' and not literal_result:
if converted_result_modifier == 'neg':
res = ResultsReport.raise_user_error('neg',
raise_exception=False)
elif converted_result_modifier == 'pos':
res = ResultsReport.raise_user_error('pos',
raise_exception=False)
elif converted_result_modifier == 'pre':
res = ResultsReport.raise_user_error('pre',
raise_exception=False)
elif converted_result_modifier == 'abs':
res = ResultsReport.raise_user_error('abs',
raise_exception=False)
elif converted_result_modifier == 'nd':
res = ResultsReport.raise_user_error('nd',
raise_exception=False)
else:
if converted_result and converted_result_modifier != 'ni':
res = round(float(converted_result), decimals)
if decimals == 0:
res = int(res)
if converted_result_modifier == 'low':
res = ResultsReport.raise_user_error(
'quantification_limit', (res,),
raise_exception=False)
obs_ql = True
return res, obs_ql
@classmethod
def get_initial_unit(cls, report_section, report_result_type,
notebook_line, obs_dl, obs_uncert, language):
pool = Pool()
ResultsReport = pool.get('lims.results_report.version.detail')
if not notebook_line.initial_unit:
return '', obs_dl, obs_uncert
initial_unit = notebook_line.initial_unit.rec_name
literal_result = notebook_line.literal_result
result_modifier = notebook_line.result_modifier
detection_limit = notebook_line.detection_limit
converted_result = notebook_line.converted_result
uncertainty = notebook_line.uncertainty
decimals = notebook_line.decimals
with Transaction().set_context(language=language):
if report_section == 'rp':
res = ''
if (not literal_result and result_modifier == 'eq'
and uncertainty and float(uncertainty) != 0):
res = round(float(uncertainty), decimals)
if decimals == 0:
res = int(res)
res = ResultsReport.raise_user_error(
'uncertainty', (res, ''),
raise_exception=False)
obs_uncert = True
else:
res = initial_unit
if (literal_result or initial_unit == '-' or
result_modifier in ('pos', 'neg', 'ni')):
res = ''
else:
if result_modifier in ('nd', 'low'):
if report_section == 'mi':
res = initial_unit
else:
if (not detection_limit or detection_limit in (
'0', '0.0')):
res = initial_unit
else:
res = ResultsReport.raise_user_error(
'detection_limit', (detection_limit,
initial_unit), raise_exception=False)
obs_dl = True
else:
if (not converted_result and uncertainty and
float(uncertainty) != 0):
res = round(float(uncertainty), decimals)
if decimals == 0:
res = int(res)
res = ResultsReport.raise_user_error(
'uncertainty', (res, initial_unit),
raise_exception=False)
obs_uncert = True
return res, obs_dl, obs_uncert
@classmethod
def get_final_unit(cls, report_section, report_result_type,
notebook_line, obs_dl, obs_uncert, language):
pool = Pool()
ResultsReport = pool.get('lims.results_report.version.detail')
if (report_section in ('for', 'mi', 'rp') or
report_result_type != 'both'):
return '', obs_dl, obs_uncert
if not notebook_line.final_unit:
return '', obs_dl, obs_uncert
final_unit = notebook_line.final_unit.rec_name
analysis = notebook_line.analysis.code
literal_result = notebook_line.literal_result
converted_result_modifier = notebook_line.converted_result_modifier
detection_limit = notebook_line.detection_limit
converted_result = notebook_line.converted_result
uncertainty = notebook_line.uncertainty
decimals = notebook_line.decimals
with Transaction().set_context(language=language):
res = final_unit
if (analysis == '0001' or literal_result or final_unit == '-'
or converted_result_modifier in ('pos', 'neg', 'ni')):
res = ''
else:
if converted_result_modifier in ('nd', 'low'):
if (not detection_limit or detection_limit in (
'0', '0.0')):
res = final_unit
else:
res = ResultsReport.raise_user_error(
'detection_limit', (detection_limit,
final_unit), raise_exception=False)
obs_dl = True
else:
if not converted_result:
res = ''
else:
if uncertainty and float(uncertainty) != 0:
res = round(float(uncertainty), decimals)
if decimals == 0:
res = int(res)
res = ResultsReport.raise_user_error(
'uncertainty', (res, final_unit),
raise_exception=False)
obs_uncert = True
return res, obs_dl, obs_uncert
@classmethod
def get_detection_limit(cls, report_section, report_result_type,
notebook_line, language):
detection_limit = notebook_line.detection_limit
literal_result = notebook_line.literal_result
if report_section in ('amb', 'sq'):
res = ''
else:
if (not detection_limit or detection_limit in ('0', '0.0')
or literal_result):
res = '-'
else:
res = detection_limit
return res
@classmethod
def get_reference(cls, range_type, notebook_line, language,
report_section):
pool = Pool()
LimsRange = pool.get('lims.range')
ResultsReport = pool.get('lims.results_report.version.detail')
with Transaction().set_context(language=language):
ranges = LimsRange.search([
('range_type', '=', range_type.id),
('analysis', '=', notebook_line.analysis.id),
('product_type', '=', notebook_line.product_type.id),
('matrix', '=', notebook_line.matrix.id),
])
if not ranges:
return ''
range_ = ranges[0]
if range_.reference:
return range_.reference
elif report_section == 'mi':
return ''
res = ''
if range_.min:
with Transaction().set_context(language=language):
resf = float(range_.min)
resd = abs(resf) - abs(int(resf))
if resd > 0:
res1 = str(round(range_.min, 2))
else:
res1 = str(int(range_.min))
res = ResultsReport.raise_user_error('caa_min',
(res1,), raise_exception=False)
if range_.max:
if res:
res += ' - '
with Transaction().set_context(language=language):
resf = float(range_.max)
resd = abs(resf) - abs(int(resf))
if resd > 0:
res1 = str(round(range_.max, 2))
else:
res1 = str(int(range_.max))
res += ResultsReport.raise_user_error('caa_max',
(res1,), raise_exception=False)
return res
class LimsResultReportTranscription(LimsResultReport):
'Transcription Results Report'
__name__ = 'lims.result_report.transcription'
@classmethod
def execute(cls, ids, data):
ResultsReport = Pool().get('lims.results_report.version.detail')
if len(ids) > 1:
ResultsReport.raise_user_error('multiple_reports')
results_report = ResultsReport(ids[0])
if results_report.state == 'annulled':
ResultsReport.raise_user_error('annulled_report')
if data is None:
data = {}
current_data = data.copy()
current_data['alt_lang'] = None
result_orig = super(LimsResultReport, cls).execute(ids, current_data)
current_data['alt_lang'] = 'en'
result_eng = super(LimsResultReport, cls).execute(ids, current_data)
save = False
if results_report.english_report:
if results_report.report_cache_odt_eng:
result = (results_report.report_format_odt_eng,
results_report.report_cache_odt_eng) + result_eng[2:]
else:
result = result_eng
if ('english_report' in current_data and
current_data['english_report']):
results_report.report_format_odt_eng, \
results_report.report_cache_odt_eng = result_eng[:2]
save = True
else:
if results_report.report_cache_odt:
result = (results_report.report_format_odt,
results_report.report_cache_odt) + result_orig[2:]
else:
result = result_orig
if ('english_report' in current_data and
not current_data['english_report']):
results_report.report_format_odt, \
results_report.report_cache_odt = result_orig[:2]
save = True
if save:
results_report.save()
return result
class LimsGlobalResultReport(Report):
'Global Results Report'
__name__ = 'lims.global_result_report'
@classmethod
def execute(cls, ids, data):
LimsResultsReport = Pool().get(
'lims.results_report')
result = super(LimsGlobalResultReport, cls).execute(ids, data)
results_report = LimsResultsReport(ids[0])
if results_report.english_report:
if results_report.report_cache_eng:
result = (results_report.report_format_eng,
results_report.report_cache_eng) + result[2:]
else:
if results_report.report_cache:
result = (results_report.report_format,
results_report.report_cache) + result[2:]
return result
class LimsCountersampleStoragePrintStart(ModelView):
'Countersamples Storage Report'
__name__ = 'lims.countersample.storage.print.start'
report_date_from = fields.Date('Report date from', required=True)
report_date_to = fields.Date('to', required=True)
date_from = fields.Date('Date from', required=True)
date_to = fields.Date('to', required=True)
class LimsCountersampleStoragePrint(Wizard):
'Countersamples Storage Report'
__name__ = 'lims.countersample.storage.print'
start = StateView('lims.countersample.storage.print.start',
'lims.lims_countersample_storage_print_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-print', default=True),
])
print_ = StateAction('lims.report_countersample_storage')
def do_print_(self, action):
data = {
'report_date_from': self.start.report_date_from,
'report_date_to': self.start.report_date_to,
'date_from': self.start.date_from,
'date_to': self.start.date_to,
}
return action, data
def transition_print_(self):
return 'end'
class LimsCountersampleStorageReport(Report):
'Countersamples Storage Report'
__name__ = 'lims.countersample.storage.report'
@classmethod
def get_context(cls, records, data):
pool = Pool()
LimsFraction = pool.get('lims.fraction')
LimsNotebookLine = pool.get('lims.notebook.line')
report_context = super(LimsCountersampleStorageReport,
cls).get_context(records, data)
report_context['company'] = report_context['user'].company
report_context['report_date_from'] = data['report_date_from']
report_context['report_date_to'] = data['report_date_to']
report_context['date_from'] = data['date_from']
report_context['date_to'] = data['date_to']
f_list = []
fractions = LimsFraction.search([
('countersample_date', '=', None),
('sample.date2', '>=', data['date_from']),
('sample.date2', '<=', data['date_to']),
('has_results_report', '=', True),
], order=[('number', 'ASC')])
for f in fractions:
notebook_lines_ids = cls._get_fraction_notebook_lines(f.id)
if not notebook_lines_ids:
continue
notebook_lines = LimsNotebookLine.search([
('id', 'in', notebook_lines_ids),
])
if not notebook_lines:
continue
# Check not accepted (with repetitions)
to_check = []
oks = []
for line in notebook_lines:
key = line.analysis.id
if not line.accepted:
to_check.append(key)
else:
oks.append(key)
to_check = list(set(to_check))
oks = list(set(oks))
if to_check:
for key in oks:
if key in to_check:
to_check.remove(key)
if len(to_check) > 0:
continue
all_results_reported = True
for nl in notebook_lines:
if not nl.accepted:
continue
if not nl.results_report:
all_results_reported = False
break
if not cls._get_line_reported(nl, data['report_date_from'],
data['report_date_to']):
all_results_reported = False
break
if all_results_reported:
f_list.append(f)
objects = {}
for fraction in f_list:
if fraction.current_location.id not in objects:
objects[fraction.current_location.id] = {
'location': fraction.current_location.rec_name,
'fractions': [],
}
objects[fraction.current_location.id]['fractions'].append({
#'number': fraction.number,
'number': fraction.get_formated_number('pt-m-sy-sn-fn'),
'type': fraction.type.code,
'packages': '%s %s' % (fraction.packages_quantity or '',
fraction.package_type.description if fraction.package_type
else ''),
'entry_date': fraction.sample.date2,
'results_reports': cls.get_fraction_results_reports(
fraction.id),
'stp_number': (fraction.sample.entry.project.code
if fraction.sample.entry.project else ''),
'comments': (fraction.comments
if fraction.comments else ''),
})
ordered_objects = sorted(objects.values(),
key=lambda x: x['location'])
report_context['objects'] = ordered_objects
return report_context
@classmethod
def _get_fraction_notebook_lines(cls, fraction_id):
cursor = Transaction().connection.cursor()
pool = Pool()
NotebookLine = pool.get('lims.notebook.line')
EntryDetailAnalysis = pool.get('lims.entry.detail.analysis')
Service = pool.get('lims.service')
cursor.execute('SELECT nl.id '
'FROM "' + NotebookLine._table + '" nl '
'INNER JOIN "' + EntryDetailAnalysis._table + '" ad '
'ON ad.id = nl.analysis_detail '
'INNER JOIN "' + Service._table + '" srv '
'ON srv.id = nl.service '
'WHERE srv.fraction = %s '
'AND nl.report = TRUE '
'AND nl.annulled = FALSE',
(fraction_id,))
return [x[0] for x in cursor.fetchall()]
@classmethod
def _get_line_reported(cls, nl, date_from, date_to):
cursor = Transaction().connection.cursor()
pool = Pool()
ReportVersionDetailLine = pool.get(
'lims.results_report.version.detail.line')
ReportVersionDetail = pool.get('lims.results_report.version.detail')
ReportVersion = pool.get('lims.results_report.version')
cursor.execute('SELECT rvdl.id '
'FROM "' + ReportVersionDetailLine._table + '" rvdl '
'INNER JOIN "' + ReportVersionDetail._table + '" rvd '
'ON rvd.id = rvdl.report_version_detail '
'INNER JOIN "' + ReportVersion._table + '" rv '
'ON rv.id = rvd.report_version '
'WHERE rvdl.notebook_line = %s '
'AND rv.results_report = %s '
'AND DATE(COALESCE(rvd.write_date, rvd.create_date)) '
'BETWEEN %s::date AND %s::date',
(nl.id, nl.results_report.id, date_from, date_to))
return cursor.fetchone()
@classmethod
def get_fraction_results_reports(cls, fraction_id):
cursor = Transaction().connection.cursor()
pool = Pool()
LimsService = pool.get('lims.service')
LimsNotebookLine = pool.get('lims.notebook.line')
LimsResultsReport = pool.get('lims.results_report')
cursor.execute('SELECT DISTINCT(nl.results_report) '
'FROM "' + LimsService._table + '" s '
'INNER JOIN "' + LimsNotebookLine._table + '" nl '
'ON s.id = nl.service '
'WHERE s.fraction = %s '
'AND nl.results_report IS NOT NULL',
(fraction_id,))
res = cursor.fetchall()
if not res:
return ''
result = []
for report_id in res:
results_report = LimsResultsReport(report_id[0])
result.append('%s (%s)' % (results_report.rec_name,
results_report.create_date2.strftime("%d/%m/%Y")))
return ' '.join(result)
class LimsCountersampleDischargePrintStart(ModelView):
'Countersamples Discharge Report'
__name__ = 'lims.countersample.discharge.print.start'
expiry_date_from = fields.Date('Expiry date from', required=True)
expiry_date_to = fields.Date('to', required=True)
date_from = fields.Date('Date from', required=True)
date_to = fields.Date('to', required=True)
class LimsCountersampleDischargePrint(Wizard):
'Countersamples Discharge Report'
__name__ = 'lims.countersample.discharge.print'
start = StateView('lims.countersample.discharge.print.start',
'lims.lims_countersample_discharge_print_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-print', default=True),
])
print_ = StateAction('lims.report_countersample_discharge')
def do_print_(self, action):
data = {
'expiry_date_from': self.start.expiry_date_from,
'expiry_date_to': self.start.expiry_date_to,
'date_from': self.start.date_from,
'date_to': self.start.date_to,
}
return action, data
def transition_print_(self):
return 'end'
class LimsCountersampleDischargeReport(Report):
'Countersamples Discharge Report'
__name__ = 'lims.countersample.discharge.report'
@classmethod
def get_context(cls, records, data):
pool = Pool()
LimsFraction = pool.get('lims.fraction')
report_context = super(LimsCountersampleDischargeReport,
cls).get_context(records, data)
report_context['company'] = report_context['user'].company
report_context['expiry_date_from'] = data['expiry_date_from']
report_context['expiry_date_to'] = data['expiry_date_to']
report_context['date_from'] = data['date_from']
report_context['date_to'] = data['date_to']
fractions = LimsFraction.search([
('discharge_date', '=', None),
('sample.date2', '>=', data['date_from']),
('sample.date2', '<=', data['date_to']),
('expiry_date', '>=', data['expiry_date_from']),
('expiry_date', '<=', data['expiry_date_to']),
], order=[('number', 'ASC')])
objects = {}
for fraction in fractions:
if fraction.current_location.id not in objects:
objects[fraction.current_location.id] = {
'location': fraction.current_location.rec_name,
'fractions': [],
}
objects[fraction.current_location.id]['fractions'].append({
'number': fraction.get_formated_number('pt-m-sy-sn-fn'),
'type': fraction.type.code,
'packages': '%s %s' % (fraction.packages_quantity or '',
fraction.package_type.description if fraction.package_type
else ''),
'entry_date': fraction.sample.date2,
'results_reports': cls.get_fraction_results_reports(
fraction.id),
'stp_number': (fraction.sample.entry.project.code
if fraction.sample.entry.project else ''),
'countersample_date': fraction.countersample_date or '',
'expiry_date': fraction.expiry_date or '',
'comments': (fraction.comments
if fraction.comments else ''),
})
ordered_objects = sorted(objects.values(),
key=lambda x: x['location'])
report_context['objects'] = ordered_objects
return report_context
@classmethod
def get_fraction_results_reports(cls, fraction_id):
cursor = Transaction().connection.cursor()
pool = Pool()
LimsService = pool.get('lims.service')
LimsNotebookLine = pool.get('lims.notebook.line')
LimsResultsReport = pool.get('lims.results_report')
cursor.execute('SELECT DISTINCT(nl.results_report) '
'FROM "' + LimsService._table + '" s '
'INNER JOIN "' + LimsNotebookLine._table + '" nl '
'ON s.id = nl.service '
'WHERE s.fraction = %s '
'AND nl.results_report IS NOT NULL',
(fraction_id,))
res = cursor.fetchall()
if not res:
return ''
result = []
for report_id in res:
results_report = LimsResultsReport(report_id[0])
result.append('%s (%s)' % (results_report.rec_name,
results_report.create_date2.strftime("%d/%m/%Y")))
return ' '.join(result)
class PrintAnalysisPendingInformStart(ModelView):
'Analysis Pending of Inform'
__name__ = 'lims.print_analysis_pending_inform.start'
date_from = fields.Date('Date from', required=True)
date_to = fields.Date('Date to', required=True)
laboratory = fields.Many2One('lims.laboratory', 'Laboratory',
required=True)
party = fields.Many2One('party.party', 'Party')
class PrintAnalysisPendingInform(Wizard):
'Analysis Pending of Inform'
__name__ = 'lims.print_analysis_pending_inform'
start = StateView('lims.print_analysis_pending_inform.start',
'lims.print_analysis_pending_inform_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-print', default=True),
])
print_ = StateReport('lims.analysis_pending_inform')
def do_print_(self, action):
data = {
'date_from': self.start.date_from,
'date_to': self.start.date_to,
'laboratory': self.start.laboratory.id,
'party': self.start.party and self.start.party.id or None,
}
return action, data
def transition_print_(self):
return 'end'
class AnalysisPendingInform(Report):
'Analysis Pending of Inform'
__name__ = 'lims.analysis_pending_inform'
@classmethod
def get_context(cls, records, data):
pool = Pool()
Laboratory = pool.get('lims.laboratory')
Party = pool.get('party.party')
report_context = super(AnalysisPendingInform, cls).get_context(
records, data)
report_context['company'] = report_context['user'].company
report_context['date_from'] = data['date_from']
report_context['date_to'] = data['date_to']
report_context['laboratory'] = Laboratory(data['laboratory']).rec_name
report_context['party'] = ''
if data['party']:
report_context['party'] = Party(data['party']).rec_name
objects = cls._get_report_records(data['date_from'], data['date_to'],
data['laboratory'], data['party'])
report_context['records'] = objects
return report_context
@classmethod
def _get_report_records(cls, date_from, date_to, laboratory, party):
pool = Pool()
LimsNotebookLine = pool.get('lims.notebook.line')
res = []
excluded_notebooks = cls._get_excluded_notebooks(date_from, date_to,
laboratory, party)
if excluded_notebooks:
for n_id, a_ids in excluded_notebooks.iteritems():
clause = [
('notebook.id', '=', n_id),
('analysis', 'in', a_ids),
('laboratory', '=', laboratory),
('report', '=', True),
('annulled', '=', False),
]
excluded_lines = LimsNotebookLine.search(clause)
if excluded_lines:
res.extend(line for line in excluded_lines)
return res
@classmethod
def _get_excluded_notebooks(cls, date_from, date_to, laboratory, party):
cursor = Transaction().connection.cursor()
pool = Pool()
NotebookLine = pool.get('lims.notebook.line')
Notebook = pool.get('lims.notebook')
Fraction = pool.get('lims.fraction')
Sample = pool.get('lims.sample')
Entry = pool.get('lims.entry')
FractionType = pool.get('lims.fraction.type')
party_clause = ''
if party:
party_clause = 'AND e.party = ' + str(party)
cursor.execute('SELECT nl.notebook, nl.analysis, nl.accepted '
'FROM "' + NotebookLine._table + '" nl '
'INNER JOIN "' + Notebook._table + '" n '
'ON n.id = nl.notebook '
'INNER JOIN "' + Fraction._table + '" f '
'ON f.id = n.fraction '
'INNER JOIN "' + Sample._table + '" s '
'ON s.id = f.sample '
'INNER JOIN "' + Entry._table + '" e '
'ON e.id = s.entry '
'INNER JOIN "' + FractionType._table + '" ft '
'ON ft.id = f.type '
'WHERE ft.report = TRUE '
'AND s.date::date >= %s::date '
'AND s.date::date <= %s::date '
'AND nl.laboratory = %s '
'AND nl.report = TRUE '
'AND nl.annulled = FALSE '
+ party_clause,
(date_from, date_to, laboratory,))
notebook_lines = cursor.fetchall()
# Check accepted repetitions
to_check = []
oks = []
for line in notebook_lines:
key = (line[0], line[1])
if not line[2]:
to_check.append(key)
else:
oks.append(key)
to_check = list(set(to_check) - set(oks))
excluded_notebooks = {}
for n_id, a_id in to_check:
if n_id not in excluded_notebooks:
excluded_notebooks[n_id] = []
excluded_notebooks[n_id].append(a_id)
return excluded_notebooks
class PrintAnalysisCheckedPendingInformStart(ModelView):
'Analysis checked pending of Inform'
__name__ = 'lims.print_analysis_checked_pending_inform.start'
date_from = fields.Date('Date from', required=True)
date_to = fields.Date('Date to', required=True)
laboratory = fields.Many2One('lims.laboratory', 'Laboratory',
required=True)
party = fields.Many2One('party.party', 'Party')
class PrintAnalysisCheckedPendingInform(Wizard):
'Analysis Checked pending of Inform'
__name__ = 'lims.print_analysis_checked_pending_inform'
start = StateView('lims.print_analysis_checked_pending_inform.start',
'lims.print_analysis_checked_pending_inform_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Print', 'print_', 'tryton-print', default=True),
])
print_ = StateReport('lims.analysis_checked_pending_inform')
def do_print_(self, action):
data = {
'date_from': self.start.date_from,
'date_to': self.start.date_to,
'laboratory': self.start.laboratory.id,
'party': self.start.party and self.start.party.id or None,
}
return action, data
def transition_print_(self):
return 'end'
class AnalysisCheckedPendingInform(Report):
'Analysis checked pending of Inform'
__name__ = 'lims.analysis_checked_pending_inform'
@classmethod
def get_context(cls, records, data):
pool = Pool()
Laboratory = pool.get('lims.laboratory')
Party = pool.get('party.party')
report_context = super(AnalysisCheckedPendingInform, cls).get_context(
records, data)
report_context['company'] = report_context['user'].company
report_context['date_from'] = data['date_from']
report_context['date_to'] = data['date_to']
report_context['laboratory'] = Laboratory(data['laboratory']).rec_name
report_context['party'] = ''
if data['party']:
report_context['party'] = Party(data['party']).rec_name
objects = cls._get_report_records(data['date_from'], data['date_to'],
data['laboratory'], data['party'])
report_context['records'] = objects
return report_context
@classmethod
def _get_report_records(cls, date_from, date_to, laboratory, party):
pool = Pool()
LimsNotebookLine = pool.get('lims.notebook.line')
res = []
included_notebooks = cls._get_included_notebooks(date_from, date_to,
laboratory, party)
if included_notebooks:
for n_id, a_ids in included_notebooks:
clause = [
('notebook.id', '=', n_id),
('analysis', 'in', [a_ids]),
('laboratory', '=', laboratory),
('report', '=', True),
('annulled', '=', False),
]
included_lines = LimsNotebookLine.search(clause)
if included_lines:
res.extend(line for line in included_lines)
return res
@classmethod
def _get_included_notebooks(cls, date_from, date_to, laboratory, party):
cursor = Transaction().connection.cursor()
pool = Pool()
NotebookLine = pool.get('lims.notebook.line')
Notebook = pool.get('lims.notebook')
Fraction = pool.get('lims.fraction')
Sample = pool.get('lims.sample')
Entry = pool.get('lims.entry')
FractionType = pool.get('lims.fraction.type')
party_clause = ''
if party:
party_clause = 'AND e.party = ' + str(party)
cursor.execute('SELECT nl.notebook, nl.analysis '
'FROM "' + NotebookLine._table + '" nl '
'INNER JOIN "' + Notebook._table + '" n '
'ON n.id = nl.notebook '
'INNER JOIN "' + Fraction._table + '" f '
'ON f.id = n.fraction '
'INNER JOIN "' + Sample._table + '" s '
'ON s.id = f.sample '
'INNER JOIN "' + Entry._table + '" e '
'ON e.id = s.entry '
'INNER JOIN "' + FractionType._table + '" ft '
'ON ft.id = f.type '
'WHERE ft.report = TRUE '
'AND s.date::date >= %s::date '
'AND s.date::date <= %s::date '
'AND nl.laboratory = %s '
'AND nl.report = TRUE '
'AND nl.accepted = TRUE '
'AND nl.results_report IS NULL '
'AND nl.annulled = FALSE '
+ party_clause,
(date_from, date_to, laboratory,))
notebook_lines = cursor.fetchall()
return notebook_lines