581 lines
21 KiB
Python
581 lines
21 KiB
Python
# This file is part of lims_sale module for Tryton.
|
|
# The COPYRIGHT file at the top level of this repository contains
|
|
# the full copyright notices and license terms.
|
|
|
|
from trytond.model import ModelSQL, fields
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Eval, Or
|
|
from trytond.transaction import Transaction
|
|
from trytond.exceptions import UserError, UserWarning
|
|
from trytond.i18n import gettext
|
|
|
|
|
|
class CreateSampleStart(metaclass=PoolMeta):
|
|
__name__ = 'lims.create_sample.start'
|
|
|
|
sale_lines_filter_product_type_matrix = fields.Boolean(
|
|
'Filter Quotes by Product type and Matrix')
|
|
sale_lines = fields.Many2Many('sale.line', None, None, 'Quotes',
|
|
domain=[('id', 'in', Eval('sale_lines_domain'))],
|
|
states={'readonly': Or(~Eval('product_type'), ~Eval('matrix'))},
|
|
depends=['sale_lines_domain', 'product_type', 'matrix'])
|
|
sale_lines_domain = fields.Function(fields.Many2Many('sale.line',
|
|
None, None, 'Quotes domain'),
|
|
'on_change_with_sale_lines_domain')
|
|
|
|
@staticmethod
|
|
def default_sale_lines_filter_product_type_matrix():
|
|
return False
|
|
|
|
@fields.depends('party', 'product_type', 'matrix',
|
|
'sale_lines_filter_product_type_matrix')
|
|
def on_change_with_sale_lines_domain(self, name=None):
|
|
cursor = Transaction().connection.cursor()
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
SaleLine = pool.get('sale.line')
|
|
Analysis = pool.get('lims.analysis')
|
|
|
|
if not self.party or not self.product_type or not self.matrix:
|
|
return []
|
|
|
|
analysis_domain = super().on_change_with_analysis_domain()
|
|
if not analysis_domain:
|
|
return []
|
|
analysis_ids = ', '.join(str(a) for a in analysis_domain)
|
|
|
|
cursor.execute('SELECT DISTINCT(product) '
|
|
'FROM "' + Analysis._table + '" '
|
|
'WHERE id IN (' + analysis_ids + ')')
|
|
res = cursor.fetchall()
|
|
if not res:
|
|
return []
|
|
product_ids = [x[0] for x in res]
|
|
|
|
today = Date.today()
|
|
clause = [
|
|
('sale.party', '=', self.party.id),
|
|
('sale.expiration_date', '>=', today),
|
|
('sale.state', 'in', [
|
|
'quotation', 'confirmed', 'processing',
|
|
]),
|
|
('product.id', 'in', product_ids),
|
|
]
|
|
if self.sale_lines_filter_product_type_matrix:
|
|
clause.append(('product_type', '=', self.product_type.id))
|
|
clause.append(('matrix', '=', self.matrix.id))
|
|
|
|
sale_lines = SaleLine.search(clause)
|
|
res = [sl.id for sl in sale_lines if not sl.services_completed]
|
|
return res
|
|
|
|
@fields.depends('party', 'product_type', 'matrix', 'sale_lines')
|
|
def on_change_with_analysis_domain(self, name=None):
|
|
pool = Pool()
|
|
Analysis = pool.get('lims.analysis')
|
|
Entry = pool.get('lims.entry')
|
|
|
|
entry = Entry(Transaction().context['active_id'])
|
|
if (not self.sale_lines and
|
|
not entry.allow_services_without_quotation):
|
|
return []
|
|
|
|
analysis_domain = super().on_change_with_analysis_domain(name)
|
|
|
|
if not self.sale_lines or entry.allow_services_without_quotation:
|
|
return analysis_domain
|
|
|
|
quoted_products = [sl.product.id
|
|
for sl in self.sale_lines if sl.product]
|
|
quoted_analysis = Analysis.search([('product', 'in', quoted_products)])
|
|
quoted_analysis_ids = [a.id for a in quoted_analysis]
|
|
return [a for a in analysis_domain if a in quoted_analysis_ids]
|
|
|
|
@fields.depends('sale_lines', 'product_type', 'matrix', 'services',
|
|
methods=['on_change_with_analysis_domain'])
|
|
def on_change_sale_lines(self, name=None):
|
|
if not self.sale_lines:
|
|
return
|
|
self.load_sale_lines_analyzes()
|
|
|
|
def load_sale_lines_analyzes(self):
|
|
pool = Pool()
|
|
Analysis = pool.get('lims.analysis')
|
|
CreateSampleService = pool.get('lims.create_sample.service')
|
|
|
|
analysis_domain = self.on_change_with_analysis_domain()
|
|
if not analysis_domain:
|
|
return
|
|
|
|
quoted_products = [sl.product.id
|
|
for sl in self.sale_lines if sl.product]
|
|
quoted_analysis = Analysis.search([('product', 'in', quoted_products)])
|
|
quoted_analysis = [a for a in quoted_analysis
|
|
if a.id in analysis_domain]
|
|
if not quoted_analysis:
|
|
return
|
|
|
|
quoted_services = []
|
|
for a in quoted_analysis:
|
|
with Transaction().set_context(
|
|
product_type=self.product_type.id,
|
|
matrix=self.matrix.id):
|
|
s = CreateSampleService()
|
|
s.analysis_locked = False
|
|
s.urgent = s.default_urgent()
|
|
s.priority = s.default_priority()
|
|
s.analysis = a
|
|
s.on_change_analysis()
|
|
s.laboratory_date = s.on_change_with_laboratory_date()
|
|
s.report_date = s.on_change_with_report_date()
|
|
|
|
quoted_services.append(s)
|
|
|
|
self.services = quoted_services
|
|
|
|
|
|
class CreateSample(metaclass=PoolMeta):
|
|
__name__ = 'lims.create_sample'
|
|
|
|
def _get_samples_defaults(self, entry_id):
|
|
pool = Pool()
|
|
Analysis = pool.get('lims.analysis')
|
|
Entry = pool.get('lims.entry')
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
samples_defaults = super()._get_samples_defaults(entry_id)
|
|
|
|
if (not hasattr(self.start, 'sale_lines') or
|
|
not hasattr(self.start, 'services')):
|
|
return samples_defaults
|
|
|
|
sale_lines = {}
|
|
for sl in self.start.sale_lines:
|
|
analysis_id = sl.analysis and sl.analysis.id
|
|
if not analysis_id:
|
|
product_id = sl.product and sl.product.id
|
|
if not product_id:
|
|
continue
|
|
analysis = Analysis.search([('product', '=', product_id)])
|
|
if not analysis:
|
|
continue
|
|
analysis_id = analysis[0].id
|
|
sale_lines[analysis_id] = {
|
|
'line': sl.id,
|
|
'available': sl.services_available,
|
|
}
|
|
if not sale_lines:
|
|
return samples_defaults
|
|
|
|
entry = Entry(Transaction().context['active_id'])
|
|
allow_services_without_quotation = (
|
|
entry.allow_services_without_quotation)
|
|
error_key = 'lims_services_without_quotation@%s' % entry.number
|
|
error_msg = 'lims_sale.msg_services_without_quotation'
|
|
labels_qty = len(self._get_labels_list(self.start.labels))
|
|
|
|
for sample in samples_defaults:
|
|
if 'fractions' not in sample:
|
|
continue
|
|
for fraction_defaults in sample['fractions']:
|
|
if 'create' not in fraction_defaults[0]:
|
|
continue
|
|
for fraction in fraction_defaults[1]:
|
|
if 'services' not in fraction:
|
|
continue
|
|
for services_defaults in fraction['services']:
|
|
if 'create' not in services_defaults[0]:
|
|
continue
|
|
for service in services_defaults[1]:
|
|
analysis_id = service['analysis']
|
|
if analysis_id not in sale_lines:
|
|
continue
|
|
service['sale_lines'] = [('add',
|
|
[sale_lines[analysis_id]['line']])]
|
|
if (sale_lines[analysis_id]['available'] is None or
|
|
sale_lines[analysis_id]['available'] >=
|
|
labels_qty):
|
|
continue
|
|
if not allow_services_without_quotation:
|
|
raise UserError(gettext(error_msg))
|
|
if Warning.check(error_key):
|
|
raise UserWarning(error_key,
|
|
gettext(error_msg))
|
|
|
|
return samples_defaults
|
|
|
|
|
|
class AddSampleServiceStart(metaclass=PoolMeta):
|
|
__name__ = 'lims.sample.add_service.start'
|
|
|
|
party = fields.Many2One('party.party', 'Party')
|
|
sale_lines_filter_product_type_matrix = fields.Boolean(
|
|
'Filter Quotes by Product type and Matrix')
|
|
sale_lines = fields.Many2Many('sale.line', None, None, 'Quotes',
|
|
domain=[('id', 'in', Eval('sale_lines_domain'))],
|
|
states={'readonly': Or(~Eval('product_type'), ~Eval('matrix'))},
|
|
depends=['sale_lines_domain', 'product_type', 'matrix'])
|
|
sale_lines_domain = fields.Function(fields.Many2Many('sale.line',
|
|
None, None, 'Quotes domain'),
|
|
'on_change_with_sale_lines_domain')
|
|
|
|
@staticmethod
|
|
def default_sale_lines_filter_product_type_matrix():
|
|
return False
|
|
|
|
@fields.depends('party', 'product_type', 'matrix',
|
|
'sale_lines_filter_product_type_matrix', 'analysis_domain')
|
|
def on_change_with_sale_lines_domain(self, name=None):
|
|
cursor = Transaction().connection.cursor()
|
|
pool = Pool()
|
|
Date = pool.get('ir.date')
|
|
SaleLine = pool.get('sale.line')
|
|
Analysis = pool.get('lims.analysis')
|
|
Sample = pool.get('lims.sample')
|
|
|
|
if not self.party or not self.product_type or not self.matrix:
|
|
return []
|
|
|
|
sample = Sample(Transaction().context['active_id'])
|
|
analysis_domain = sample.on_change_with_analysis_domain()
|
|
if not analysis_domain:
|
|
return []
|
|
analysis_ids = ', '.join(str(a) for a in analysis_domain)
|
|
|
|
cursor.execute('SELECT DISTINCT(product) '
|
|
'FROM "' + Analysis._table + '" '
|
|
'WHERE id IN (' + analysis_ids + ')')
|
|
res = cursor.fetchall()
|
|
if not res:
|
|
return []
|
|
product_ids = [x[0] for x in res]
|
|
|
|
today = Date.today()
|
|
clause = [
|
|
('sale.party', '=', self.party.id),
|
|
('sale.expiration_date', '>=', today),
|
|
('sale.state', 'in', [
|
|
'quotation', 'confirmed', 'processing',
|
|
]),
|
|
('product.id', 'in', product_ids),
|
|
]
|
|
if self.sale_lines_filter_product_type_matrix:
|
|
clause.append(('product_type', '=', self.product_type.id))
|
|
clause.append(('matrix', '=', self.matrix.id))
|
|
|
|
sale_lines = SaleLine.search(clause)
|
|
res = [sl.id for sl in sale_lines if not sl.services_completed]
|
|
return res
|
|
|
|
@fields.depends('party', 'product_type', 'matrix', 'sale_lines')
|
|
def on_change_with_analysis_domain(self, name=None):
|
|
pool = Pool()
|
|
Analysis = pool.get('lims.analysis')
|
|
Sample = pool.get('lims.sample')
|
|
|
|
sample = Sample(Transaction().context['active_id'])
|
|
if (not self.sale_lines and
|
|
not sample.entry.allow_services_without_quotation):
|
|
return []
|
|
|
|
analysis_domain = sample.on_change_with_analysis_domain()
|
|
|
|
if not self.sale_lines:
|
|
return analysis_domain
|
|
|
|
quoted_products = [sl.product.id
|
|
for sl in self.sale_lines if sl.product]
|
|
quoted_analysis = Analysis.search([('product', 'in', quoted_products)])
|
|
quoted_analysis_ids = [a.id for a in quoted_analysis]
|
|
return [a for a in analysis_domain if a in quoted_analysis_ids]
|
|
|
|
@fields.depends('sale_lines', 'product_type', 'matrix', 'services',
|
|
methods=['on_change_with_analysis_domain'])
|
|
def on_change_sale_lines(self, name=None):
|
|
if not self.sale_lines:
|
|
return
|
|
self.load_sale_lines_analyzes()
|
|
|
|
def load_sale_lines_analyzes(self):
|
|
pool = Pool()
|
|
Analysis = pool.get('lims.analysis')
|
|
CreateSampleService = pool.get('lims.create_sample.service')
|
|
|
|
analysis_domain = self.on_change_with_analysis_domain()
|
|
if not analysis_domain:
|
|
return
|
|
|
|
quoted_products = [sl.product.id
|
|
for sl in self.sale_lines if sl.product]
|
|
quoted_analysis = Analysis.search([('product', 'in', quoted_products)])
|
|
quoted_analysis = [a for a in quoted_analysis
|
|
if a.id in analysis_domain]
|
|
if not quoted_analysis:
|
|
return
|
|
|
|
quoted_services = []
|
|
for a in quoted_analysis:
|
|
with Transaction().set_context(
|
|
product_type=self.product_type.id,
|
|
matrix=self.matrix.id):
|
|
s = CreateSampleService()
|
|
s.analysis_locked = False
|
|
s.urgent = s.default_urgent()
|
|
s.priority = s.default_priority()
|
|
s.analysis = a
|
|
s.on_change_analysis()
|
|
s.laboratory_date = s.on_change_with_laboratory_date()
|
|
s.report_date = s.on_change_with_report_date()
|
|
|
|
quoted_services.append(s)
|
|
|
|
self.services = quoted_services
|
|
|
|
|
|
class AddSampleService(metaclass=PoolMeta):
|
|
__name__ = 'lims.sample.add_service'
|
|
|
|
def default_start(self, fields):
|
|
Sample = Pool().get('lims.sample')
|
|
|
|
defaults = super().default_start(fields)
|
|
|
|
sample = Sample(Transaction().context['active_id'])
|
|
if not sample:
|
|
return defaults
|
|
|
|
defaults['party'] = sample.party.id
|
|
return defaults
|
|
|
|
def _get_new_service(self, service, fraction):
|
|
pool = Pool()
|
|
Analysis = pool.get('lims.analysis')
|
|
Sample = pool.get('lims.sample')
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
service_create = super()._get_new_service(service, fraction)
|
|
|
|
if not hasattr(self.start, 'sale_lines'):
|
|
return service_create
|
|
|
|
sale_lines = {}
|
|
for sl in self.start.sale_lines:
|
|
analysis_id = sl.analysis and sl.analysis.id
|
|
if not analysis_id:
|
|
product_id = sl.product and sl.product.id
|
|
if not product_id:
|
|
continue
|
|
analysis = Analysis.search([('product', '=', product_id)])
|
|
if not analysis:
|
|
continue
|
|
analysis_id = analysis[0].id
|
|
sale_lines[analysis_id] = {
|
|
'line': sl.id,
|
|
'available': sl.services_available,
|
|
}
|
|
if not sale_lines:
|
|
return service_create
|
|
|
|
analysis_id = service_create['analysis']
|
|
if analysis_id not in sale_lines:
|
|
return service_create
|
|
service_create['sale_lines'] = [('add',
|
|
[sale_lines[analysis_id]['line']])]
|
|
if (sale_lines[analysis_id]['available'] is None or
|
|
sale_lines[analysis_id]['available'] >= 1):
|
|
return service_create
|
|
|
|
sample = Sample(Transaction().context['active_id'])
|
|
error_key = 'lims_services_without_quotation@%s' % sample.entry.number
|
|
error_msg = 'lims_sale.msg_party_services_without_quotation'
|
|
warning_msg = 'lims_sale.msg_adding_services_without_quotation'
|
|
if not sample.entry.allow_services_without_quotation:
|
|
raise UserError(gettext(error_msg))
|
|
if Warning.check(error_key):
|
|
raise UserWarning(error_key, gettext(warning_msg))
|
|
|
|
return service_create
|
|
|
|
|
|
class EditSampleService(metaclass=PoolMeta):
|
|
__name__ = 'lims.sample.edit_service'
|
|
|
|
def _get_new_service(self, service, fraction):
|
|
pool = Pool()
|
|
Sample = pool.get('lims.sample')
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
service_create = super()._get_new_service(service, fraction)
|
|
|
|
sample = Sample(Transaction().context['active_id'])
|
|
error_key = 'lims_services_without_quotation@%s' % sample.entry.number
|
|
error_msg = 'lims_sale.msg_party_services_without_quotation'
|
|
warning_msg = 'lims_sale.msg_adding_services_without_quotation'
|
|
if not sample.entry.allow_services_without_quotation:
|
|
raise UserError(gettext(error_msg))
|
|
|
|
if Warning.check(error_key):
|
|
raise UserWarning(error_key, gettext(warning_msg))
|
|
|
|
return service_create
|
|
|
|
|
|
class EditSample(metaclass=PoolMeta):
|
|
__name__ = 'lims.sample.edit'
|
|
|
|
def transition_confirm(self):
|
|
pool = Pool()
|
|
ServiceSaleLine = pool.get('lims.service-sale.line')
|
|
|
|
error_msg = 'lims_sale.msg_party_services_without_quotation'
|
|
|
|
samples = self._get_filtered_samples()
|
|
for sample in samples:
|
|
if self.start.party and self.start.party != sample.party:
|
|
sale_lines = ServiceSaleLine.search([
|
|
('service.fraction.sample', '=', sample.id),
|
|
])
|
|
if not sale_lines:
|
|
continue
|
|
if self.start.party.allow_services_without_quotation:
|
|
ServiceSaleLine.delete(sale_lines)
|
|
else:
|
|
raise UserError(gettext(error_msg))
|
|
return super().transition_confirm()
|
|
|
|
|
|
class Sample(metaclass=PoolMeta):
|
|
__name__ = 'lims.sample'
|
|
|
|
sale_lines = fields.Function(fields.Many2Many('sale.line',
|
|
None, None, 'Quotes'), 'get_sale_lines')
|
|
|
|
def get_sale_lines(self, name):
|
|
pool = Pool()
|
|
ServiceSaleLine = pool.get('lims.service-sale.line')
|
|
sale_lines = ServiceSaleLine.search([
|
|
('service.fraction.sample', '=', self.id),
|
|
])
|
|
return [sl.sale_line.id for sl in sale_lines]
|
|
|
|
|
|
class Service(metaclass=PoolMeta):
|
|
__name__ = 'lims.service'
|
|
|
|
sale_lines = fields.Many2Many('lims.service-sale.line',
|
|
'service', 'sale_line', 'Quotes', readonly=True)
|
|
matching_quote_removed = fields.Many2One('sale.line',
|
|
'Matching quote removed', readonly=True)
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
services = super().create(vlist)
|
|
silent = Transaction().context.get('create_sample', False)
|
|
cls.check_services_without_quotation(services, silent)
|
|
return services
|
|
|
|
@classmethod
|
|
def copy(cls, services, default=None):
|
|
new_service = super().copy(services, default)
|
|
silent = Transaction().context.get('create_sample', False)
|
|
cls.check_services_without_quotation(new_service, silent)
|
|
return new_service
|
|
|
|
@classmethod
|
|
def check_services_without_quotation(cls, services, silent=False):
|
|
pool = Pool()
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
to_unlink = []
|
|
for service in services:
|
|
if not service.sale_lines:
|
|
continue
|
|
|
|
entry = service.entry
|
|
allow_services_without_quotation = (
|
|
entry.allow_services_without_quotation)
|
|
error_key = 'lims_services_without_quotation@%s' % entry.number
|
|
error_msg = 'lims_sale.msg_services_without_quotation'
|
|
|
|
for sl in service.sale_lines:
|
|
if (sl.quantity is None or sl.unlimited_quantity or
|
|
sl.quantity >= len(sl.services)):
|
|
continue
|
|
if not allow_services_without_quotation:
|
|
raise UserError(gettext(error_msg))
|
|
if not silent and Warning.check(error_key):
|
|
raise UserWarning(error_key, gettext(error_msg))
|
|
service.matching_quote_removed = sl.id
|
|
service.save()
|
|
to_unlink.append(service)
|
|
if to_unlink:
|
|
cls.unlink_sale_lines(to_unlink)
|
|
|
|
@classmethod
|
|
def write(cls, *args):
|
|
super().write(*args)
|
|
actions = iter(args)
|
|
for services, vals in zip(actions, actions):
|
|
if vals.get('annulled'):
|
|
cls.unlink_sale_lines(services)
|
|
|
|
@classmethod
|
|
def unlink_sale_lines(cls, services):
|
|
pool = Pool()
|
|
ServiceSaleLine = pool.get('lims.service-sale.line')
|
|
sale_lines = ServiceSaleLine.search([
|
|
('service', 'in', [s.id for s in services]),
|
|
])
|
|
if sale_lines:
|
|
ServiceSaleLine.delete(sale_lines)
|
|
|
|
|
|
class Service2(metaclass=PoolMeta):
|
|
__name__ = 'lims.service'
|
|
|
|
def get_invoice_line(self):
|
|
invoice_line = super().get_invoice_line()
|
|
if not invoice_line:
|
|
return
|
|
if self.sale_lines:
|
|
for sale_line in self.sale_lines:
|
|
if sale_line.product.id == self.analysis.product.id:
|
|
invoice_line['lims_sale_line_origin'] = sale_line.id
|
|
invoice_line['unit_price'] = sale_line.unit_price
|
|
return invoice_line
|
|
|
|
|
|
class ServiceSaleLine(ModelSQL):
|
|
'Service - Sale Line'
|
|
__name__ = 'lims.service-sale.line'
|
|
_table = 'lims_service_sale_line'
|
|
|
|
service = fields.Many2One('lims.service', 'Service',
|
|
ondelete='CASCADE', select=True, required=True)
|
|
sale_line = fields.Many2One('sale.line', 'Sale Line',
|
|
ondelete='CASCADE', select=True, required=True)
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
sale_lines = super().create(vlist)
|
|
with Transaction().set_context(_check_access=False):
|
|
sales = set(sl.sale_line.sale for sl in sale_lines)
|
|
if sales:
|
|
cls.process_sale(sales)
|
|
return sale_lines
|
|
|
|
@classmethod
|
|
def delete(cls, sale_lines):
|
|
with Transaction().set_context(_check_access=False):
|
|
sales = set(sl.sale_line.sale for sl in sale_lines)
|
|
super().delete(sale_lines)
|
|
if sales:
|
|
cls.process_sale(sales)
|
|
|
|
@classmethod
|
|
def process_sale(cls, sales):
|
|
pool = Pool()
|
|
Sale = pool.get('sale.sale')
|
|
with Transaction().set_context(_check_access=False):
|
|
Sale.__queue__.process(sales)
|