trytond-sale_unit_load/sale.py

587 lines
22 KiB
Python

# The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import math
from decimal import Decimal
from trytond.model import fields, ModelView
from trytond.pool import PoolMeta, Pool
from trytond.pyson import Eval, Bool, Not, If
from trytond.modules.product import price_digits
from trytond.transaction import Transaction
from trytond.tools import reduce_ids
from trytond.modules.stock_unit_load import cases_digits
from sql import Null
from sql.conditionals import Coalesce
from sql.aggregate import Sum
class Sale(metaclass=PoolMeta):
__name__ = 'sale.sale'
ul_quantity = fields.Function(
fields.Float('ULs', digits=(16, 0)),
'get_ul_quantity')
cases_quantity = fields.Function(
fields.Float('Cases', digits=cases_digits),
'get_ul_quantity')
@classmethod
def get_ul_quantity(cls, records, names):
pool = Pool()
SaleLine = pool.get('sale.line')
sale_line = SaleLine.__table__()
cursor = Transaction().connection.cursor()
values = {r.id: 0 for r in records}
res = {n: values.copy() for n in names}
cursor.execute(*sale_line.select(
sale_line.sale,
Sum(Coalesce(sale_line.ul_quantity, 0)),
Sum(Coalesce(sale_line.cases_quantity, 0)),
where=(reduce_ids(sale_line.sale, values.keys())),
group_by=sale_line.sale)
)
for record_id, uls, cases in cursor.fetchall():
if 'ul_quantity' in names:
res['ul_quantity'][record_id] = uls
if 'cases_quantity' in names:
res['cases_quantity'][record_id] = cases
return res
@property
def cases_digits(self):
"""Cases digits to use on reports"""
return cases_digits[1]
class SaleLine(metaclass=PoolMeta):
__name__ = 'sale.line'
ul_quantity = fields.Float('ULs', digits=(16, 0),
states={'readonly': Eval('sale_state') != 'draft'},
depends=['sale_state'])
quantity_per_ul = fields.Function(
fields.Float('Quantity per UL', digits=(16, Eval('unit_digits', 0)),
depends=['unit_digits']),
'on_change_with_quantity_per_ul', setter='set_quantity_per_ul')
unit_loads = fields.Function(
fields.One2Many('stock.unit_load', None, 'Unit loads'),
'get_unit_loads')
ul_cases_quantity = fields.Function(
fields.Float('Cases per UL', digits=cases_digits,
states={'readonly': Eval('sale_state') != 'draft'},
depends=['sale_state']),
'get_ul_cases_quantity', setter='set_ul_cases_quantity')
cases_quantity = fields.Float('Cases',
digits=cases_digits,
states={'readonly': Eval('sale_state') != 'draft'},
depends=['sale_state'])
case_price = fields.Function(
fields.Numeric('Case price', digits=price_digits),
'get_case_price')
quantity_per_case = fields.Function(
fields.Float('Quantity per Case', digits=(16, Eval('unit_digits', 0)),
depends=['unit_digits']),
'get_quantity_per_case')
@classmethod
def __setup__(cls):
super(SaleLine, cls).__setup__()
if hasattr(cls, 'costs_amount'):
cls.costs_case_price = fields.Function(
fields.Numeric('Costs case price', digits=price_digits),
'get_costs_case_price')
@classmethod
def __register__(cls, module_name):
table_h = cls.__table_handler__(module_name)
sql_table = cls.__table__()
cursor = Transaction().connection.cursor()
# Migration from 5.0: rename reference into number
update_cases = (not table_h.column_exist('cases_quantity') and
table_h.column_exist('ul_cases_quantity'))
super().__register__(module_name)
if update_cases:
cursor.execute(*sql_table.update(
columns=[sql_table.cases_quantity],
values=[sql_table.ul_cases_quantity * sql_table.ul_quantity],
where=(
(sql_table.ul_cases_quantity != Null) &
(sql_table.ul_quantity != Null)))
)
table_h.drop_column('ul_cases_quantity')
@fields.depends('quantity_per_ul', 'ul_quantity', 'unit')
def on_change_quantity_per_ul(self, name=None):
if self.quantity_per_ul and self.ul_quantity:
self.quantity = self.unit.round(
self.quantity_per_ul * self.ul_quantity)
@fields.depends('ul_quantity', 'quantity', 'unit')
def on_change_with_quantity_per_ul(self, name=None):
if self.ul_quantity and self.quantity and self.unit:
return self.unit.round(self.quantity / self.ul_quantity)
return None
@classmethod
def set_quantity_per_ul(cls, records, name, value):
pass
def get_quantity_per_case(self, name=None):
if self.cases_quantity and self.quantity and self.unit:
return self.unit.round(self.quantity / self.cases_quantity)
return None
@classmethod
def set_ul_cases_quantity(cls, records, name, value):
pass
@fields.depends(methods=['_compute_ul_cases_quantity'])
def on_change_cases_quantity(self):
self._compute_ul_cases_quantity()
def get_unit_loads(self, name=None):
if not self.moves:
return []
return list(set(m.unit_load.id for m in self.moves if m.unit_load))
@staticmethod
def _get_case_uom():
pool = Pool()
Modeldata = pool.get('ir.model.data')
Uom = pool.get('product.uom')
return Uom(Modeldata.get_id('product', 'uom_unit'))
@fields.depends(methods=['get_cases_quantity'])
def on_change_ul_cases_quantity(self):
self.cases_quantity = self.get_cases_quantity()
@fields.depends(methods=['get_cases_quantity',
'_compute_ul_cases_quantity'])
def on_change_ul_quantity(self):
self.cases_quantity = self.get_cases_quantity()
self._compute_ul_cases_quantity()
@fields.depends('cases_quantity', 'ul_quantity', 'ul_cases_quantity',
methods=['get_ul_cases_quantity'])
def _compute_ul_cases_quantity(self):
if self.cases_quantity:
if not self.ul_cases_quantity or \
self.ul_cases_quantity > self.cases_quantity:
self.ul_cases_quantity = self.cases_quantity
elif self.ul_quantity:
self.ul_cases_quantity = self.get_ul_cases_quantity()
@fields.depends('cases_quantity', 'ul_quantity')
def get_ul_cases_quantity(self, name=None):
if self.cases_quantity and self.ul_quantity:
return self._get_case_uom().round(
self.cases_quantity / self.ul_quantity)
@fields.depends('ul_cases_quantity', 'ul_quantity')
def get_cases_quantity(self, name=None):
if self.ul_cases_quantity and self.ul_quantity:
return self._get_case_uom().round(
self.ul_cases_quantity * self.ul_quantity)
@classmethod
def get_case_price(cls, records, name=None):
res = {r.id: None for r in records}
digits = price_digits[1]
for record in records:
if record.cases_quantity:
res[record.id] = (record.amount / Decimal(
record.cases_quantity)).quantize(
Decimal(10) ** -Decimal(digits))
return res
@classmethod
def get_costs_case_price(cls, records, name=None):
return cls._get_costs_price(records, 'cases_quantity')
@classmethod
def _compute_ul_quantity(cls, op1, op2, operator='/'):
if operator == '/':
return float(math.ceil(op1 / op2))
else:
return float(math.ceil(op1 * op2))
class SaleLineQuickAction(metaclass=PoolMeta):
__name__ = 'sale.line.quick_action'
def default_split(self, fields):
pool = Pool()
SaleLine = pool.get('sale.line')
sale_line = SaleLine(Transaction().context['active_id'])
defaults = super().default_split(fields)
defaults.update({
'ul_required': bool(sale_line.ul_quantity is not None),
'ul_quantity': sale_line.ul_quantity,
'quantity_per_ul': sale_line.quantity_per_ul,
})
return defaults
def _get_split_values(self):
pool = Pool()
SaleLine = pool.get('sale.line')
res = super()._get_split_values()
sale_line = SaleLine(Transaction().context['active_id'])
res.update({
'ul_quantity': (self.split.ul_quantity, 0),
'cases_quantity': (
round(sale_line.ul_cases_quantity * self.split.ul_quantity, 0),
0)
})
return res
class SaleLineQuickActionSplit(metaclass=PoolMeta):
__name__ = 'sale.line.quick_action.split'
quantity_per_ul = fields.Float('Quantity per UL', readonly=True,
digits=(16, Eval('unit_digits', 0)),
depends=['unit_digits'])
ul_required = fields.Boolean('UL required')
ul_quantity = fields.Float('ULs', digits=(16, 0),
states={
'required': Bool(Eval('ul_required'))
},
depends=['ul_required'])
@fields.depends('ul_quantity', 'quantity_per_ul', 'unit')
def on_change_ul_quantity(self):
if self.ul_quantity and self.quantity_per_ul:
self.quantity = self.unit.round(
self.ul_quantity * self.quantity_per_ul)
@fields.depends('quantity', 'quantity_per_ul')
def on_change_quantity(self):
SaleLine = Pool().get('sale.line')
if self.quantity and self.quantity_per_ul:
self.ul_quantity = SaleLine._compute_ul_quantity(self.quantity,
self.quantity_per_ul)
class ReturnSaleStart(metaclass=PoolMeta):
__name__ = 'sale.return_sale.start'
unit_loads = fields.One2Many('stock.unit_load', None, 'Unit Loads',
domain=[('sale_line', 'in', Eval('available_lines'))],
states={
'invisible': Not(Bool(Eval('ul_required'))),
'required': Bool(Eval('ul_required')),
},
context={'ul_extended_rec_name': True},
depends=['ul_required', 'available_lines'])
date = fields.Date('Return date', states={
'invisible': (Not(Bool(Eval('ul_required'))) &
Not(Bool(Eval('new_customer')))),
'required': Bool(Eval('ul_required')) | Bool(Eval('new_customer')),
}, depends=['ul_required', 'new_customer'])
ul_required = fields.Boolean('UL required')
new_customer = fields.Many2One('party.party', 'New Customer')
shipment_address = fields.Many2One('party.address', 'Shipment Address',
domain=[('party', '=', Eval('new_customer'))],
states={
'required': Bool(Eval('new_customer')),
'invisible': Not(Bool(Eval('new_customer')))
},
depends=['new_customer'])
lines = fields.One2Many('sale.return_sale.start.line', None,
'Lines', required=True, readonly=True,
domain=[('sale_line', 'in', Eval('available_lines'))],
states={
'invisible': Bool(Eval('ul_required'))
},
depends=['available_lines'])
available_lines = fields.Many2Many('sale.line', None, None,
'Available Lines')
@fields.depends('unit_loads', 'lines')
def on_change_unit_loads(self):
if self.unit_loads:
uls2line = {}
for ul in self.unit_loads:
uls2line.setdefault(ul.sale_line, 0)
uls2line[ul.sale_line] += 1
for sline, qty in uls2line.items():
for line in self.lines:
if line.sale_line != sline:
continue
line.return_ul_quantity = min(sline.ul_quantity, qty)
@fields.depends('new_customer')
def on_change_new_customer(self):
self.shipment_address = None
if self.new_customer:
self.shipment_address = self.new_customer.address_get(
type='delivery')
class ReturnSaleStartLine(ModelView):
"""Start Return Sale line"""
__name__ = 'sale.return_sale.start.line'
sale_line = fields.Many2One('sale.line', 'Sale line', required=True,
readonly=True)
ul_quantity = fields.Float('ULs', digits=(16, 0), readonly=True)
return_ul_quantity = fields.Float('Return ULs', digits=(16, 0),
domain=[If((Eval('ul_quantity', None) != None),
[
('return_ul_quantity', '>=', 0),
('return_ul_quantity', '<=', Eval('ul_quantity'))
], [])
], states={
'readonly': Bool(Eval('has_unit_loads')),
'required': (Eval('ul_quantity', None) != None),
}, depends=['ul_quantity'])
unit = fields.Many2One('product.uom', 'Unit', readonly=True)
unit_digits = fields.Integer('Unit digits', readonly=True)
has_unit_loads = fields.Boolean('Has unit loads', readonly=True)
class ReturnSale(metaclass=PoolMeta):
__name__ = 'sale.return_sale'
def default_start(self, fields):
pool = Pool()
Date = pool.get('ir.date')
sales = self._get_sales()
lines = [line for s in sales for line in s.lines]
res = {
'date': Date.today(),
'ul_required': any(
line.unit_loads for sale in sales for line in sale.lines),
'available_lines': list(map(int, lines)),
'lines': [self._get_start_line(l) for l in lines]
}
return res
def _get_start_line(self, sale_line):
return {
'sale_line': sale_line.id,
'ul_quantity': sale_line.ul_quantity,
'return_ul_quantity': 0.0,
'unit': sale_line.unit and sale_line.unit.id or None,
'unit_digits': sale_line.unit and sale_line.unit.digits or 2,
'has_unit_loads': bool(sale_line.unit_loads)
}
def do_return_(self, action):
"""Override method and not call super"""
pool = Pool()
Sale = pool.get('sale.sale')
SaleLine = pool.get('sale.line')
Move = pool.get('stock.move')
sales = self._get_sales()
saleline2uls = self._get_sale_line_uls()
ul2returnlines = {}
new_sale = None
if self.start.new_customer:
# Create new sale with new customer
# TODO: add uls to new sale
new_sale = self._create_new_sale(saleline2uls, {
'party': self.start.new_customer,
'shipment_address': self.start.shipment_address,
'shipment_party': None,
'sale_date': self.start.date,
'lines': None,
})
return_sales = []
for sale in sales:
return_sale = None
return_lines = []
for line in sale.lines:
if line.id not in saleline2uls:
continue
# todo: copy lines of other types (comment, subtotal)
if return_sale is None:
return_sale = self._create_return_sale(sale)
if not return_sale:
# method "_create_return_sale" can return none
continue
return_defvalues = self._get_return_line_defvalues(line)
return_defvalues['sale'] = return_sale.id
return_line = SaleLine.copy([line], default=return_defvalues)
if not return_line:
# copy can return anything
continue
return_line, = return_line
return_uls = saleline2uls[line.id]
# store uls for current return sale line
if isinstance(return_uls, list):
ul2returnlines.update({
ul.id: return_line for ul in return_uls
})
self._update_return_line_values(return_line, line, return_uls)
return_lines.append(return_line)
if not return_sale:
continue
return_sale.lines = return_lines
return_sale.save()
return_sales.append(return_sale)
moves_todel = []
return_shipments = return_sale.create_shipment('return') or []
return_sale.shipment_method = 'manual'
for return_shipment in return_shipments:
return_uls = []
# as can be many return shipments, we need to identify
# which uls to link based on return sale lines and
# origin of moves
for ul, return_line in ul2returnlines.items():
if return_line in [m.origin
for m in return_shipment.incoming_moves]:
return_uls.append(ul)
moves_todel.extend(return_shipment.incoming_moves)
return_shipment.unit_loads = return_uls
return_shipment.on_change_unit_loads()
for move in return_shipment.incoming_moves:
if move.unit_load and \
move.product == move.unit_load.product:
# set origin again
move.origin = ul2returnlines[move.unit_load.id]
if moves_todel:
Move.delete(moves_todel)
if return_shipments:
return_shipment.__class__.save(return_shipments)
if return_sales:
Sale.quote(return_sales)
data = {'res_id': [s.id for s in return_sales]}
if new_sale:
data['res_id'].append(new_sale.id)
if len(data['res_id']) == 1:
action['views'].reverse()
return action, data
def _get_sale_line_uls(self):
UnitLoad = Pool().get('stock.unit_load')
res = {}
for line in self.start.lines:
if line.has_unit_loads or not line.return_ul_quantity:
continue
res.setdefault(line.sale_line.id, line.return_ul_quantity)
for ul in UnitLoad.browse(list(map(int, self.start.unit_loads))):
res.setdefault(ul.sale_line.id, []).append(ul)
return res
def _get_return_line_defvalues(self, line):
return {}
def _update_return_line_values(self, return_line, original_line, uls):
if isinstance(uls, float):
return_line.ul_quantity = -uls
return_line.quantity = - return_line.unit.round(
original_line.quantity_per_ul * uls)
return_line.cases_quantity = - round(
original_line.ul_cases_quantity * uls)
elif isinstance(uls, list) and uls:
return_line.ul_quantity = - float(len(uls))
uom = uls and uls[0].uom or return_line.unit
# change uom to get precision due to return quantity normally would be
# lower than original sale line quantity
return_line.unit_price = uom.compute_price(return_line.unit,
return_line.unit_price, uom)
return_line.unit = uom
return_line.quantity = - sum(return_line.unit.compute_qty(
ul.uom, ul.quantity, return_line.unit) for ul in uls
)
return_line.cases_quantity = - sum(ul.cases_quantity for ul in uls)
def _create_return_sale(self, original_sale):
pool = Pool()
Sale = pool.get('sale.sale')
defvalues = {
'sale_date': self.start.date,
'origin': '%s,%s' % (original_sale.__name__, original_sale.id),
'shipment_method': 'order',
'lines': None
}
if original_sale.shipping_date:
defvalues['shipping_date'] = self.start.date
return_sale = Sale.copy([original_sale], defvalues)
return return_sale[0] if return_sale else None
def _create_new_sale(self, saleline2uls, values):
pool = Pool()
Sale = pool.get('sale.sale')
SaleLine = pool.get('sale.line')
sale = Sale(**values)
sale.on_change_party()
sale.shipment_address = values.get('shipment_address', None)
sale.save()
lines = []
for line_id, uls in saleline2uls.items():
line = SaleLine(line_id)
newline_defvalues = self._get_new_line_defvalues(line)
newline_defvalues['sale'] = sale.id
new_line, = SaleLine.copy([line], default=newline_defvalues)
self._update_new_line_values(new_line, line, uls)
lines.append(new_line)
SaleLine.save(lines)
return Sale(sale.id)
def _get_new_line_defvalues(self, line):
return {}
def _update_new_line_values(self, new_line, original_line, uls):
taxes = []
pattern = new_line._get_tax_rule_pattern()
for tax in new_line.product.customer_taxes_used:
if new_line.sale.party and new_line.sale.party.customer_tax_rule:
tax_ids = new_line.sale.party.customer_tax_rule.apply(
tax, pattern)
if tax_ids:
taxes.extend(tax_ids)
continue
taxes.append(tax.id)
if new_line.sale.party and new_line.sale.party.customer_tax_rule:
tax_ids = new_line.sale.party.customer_tax_rule.apply(
None, pattern)
if tax_ids:
taxes.extend(tax_ids)
new_line.taxes = taxes
if isinstance(uls, float):
new_line.ul_quantity = uls
new_line.quantity = new_line.unit.round(
original_line.quantity_per_ul * uls)
new_line.cases_quantity = round(
original_line.ul_cases_quantity * uls)
elif isinstance(uls, list) and uls:
new_line.ul_quantity = float(len(uls))
uom = uls[0].uom
new_line.unit_price = uom.compute_price(new_line.unit,
new_line.unit_price, uom)
new_line.unit = uom
new_line.quantity = sum(new_line.unit.compute_qty(
ul.uom, ul.quantity, new_line.unit) for ul in uls
)
new_line.cases_quantity = sum(ul.cases_quantity for ul in uls)
def _get_sales(self):
pool = Pool()
Sale = pool.get('sale.sale')
return Sale.browse(Transaction().context['active_ids'])