mirror of
https://gitlab.com/datalifeit/trytond-stock_location_product_limit
synced 2023-12-14 04:23:00 +01:00
dc22a4df3e
This commit refs #24897
434 lines
16 KiB
Python
434 lines
16 KiB
Python
# The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
from datetime import date
|
|
from dateutil.relativedelta import relativedelta
|
|
from trytond.model import Unique
|
|
from trytond.report import Report
|
|
from trytond.transaction import Transaction
|
|
from trytond.model import ModelSQL, ModelView, fields
|
|
from trytond.pyson import Eval
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.i18n import gettext
|
|
from trytond.exceptions import UserWarning
|
|
from trytond.wizard import StateReport
|
|
from trytond.wizard import Wizard, StateTransition, StateView, Button
|
|
|
|
|
|
class ProductLimit(ModelSQL, ModelView):
|
|
"""Location product limit"""
|
|
__name__ = 'stock.location.product_limit'
|
|
|
|
location = fields.Many2One('stock.location', 'Location',
|
|
ondelete='CASCADE', required=True,
|
|
domain=[('type', '=', 'warehouse'), ],
|
|
context={'product': Eval('product')},
|
|
depends=['product'])
|
|
product = fields.Many2One('product.product', 'Product',
|
|
required=True, ondelete='RESTRICT')
|
|
quantity = fields.Float('Quantity', required=True,
|
|
digits=(16, Eval('uom_digits', 2)),
|
|
depends=['uom_digits'])
|
|
uom_category = fields.Function(
|
|
fields.Many2One('product.uom.category', 'UOM Category'),
|
|
'on_change_with_uom_category')
|
|
uom = fields.Many2One('product.uom', 'UOM', required=True,
|
|
ondelete='RESTRICT', domain=[
|
|
('category', '=', Eval('uom_category'))],
|
|
depends=['uom_category'])
|
|
uom_digits = fields.Function(
|
|
fields.Integer('UOM Digits'), 'on_change_with_uom_digits')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(ProductLimit, cls).__setup__()
|
|
t = cls.__table__()
|
|
cls._sql_constraints = [
|
|
('location_product_uniq', Unique(t, t.location, t.product),
|
|
'stock_location_product_limit.'
|
|
'msg_stock_location_product_limit_location_product_uniq')]
|
|
|
|
@fields.depends('product')
|
|
def on_change_product(self):
|
|
if self.product:
|
|
self.uom = self.product.default_uom
|
|
|
|
@fields.depends('product')
|
|
def on_change_with_uom_category(self, name=None):
|
|
if self.product:
|
|
return self.product.default_uom_category.id
|
|
|
|
@fields.depends('uom')
|
|
def on_change_with_uom_digits(self, name=None):
|
|
if self.uom:
|
|
return self.uom.digits
|
|
return 2
|
|
|
|
@classmethod
|
|
def product_limits_by_location(cls, location_id, product_ids=[],
|
|
at_date=None):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
Uom = pool.get('product.uom')
|
|
|
|
context = Transaction().context.copy()
|
|
context.update({
|
|
'locations': [location_id],
|
|
'with_childs': True
|
|
})
|
|
if at_date:
|
|
context['stock_date_end'] = at_date
|
|
|
|
products_with_limit = cls.search([
|
|
('location', '=', location_id),
|
|
('product', 'in', product_ids) if product_ids else ()])
|
|
if not product_ids:
|
|
product_ids = [l.product.id for l in products_with_limit]
|
|
products = Product.browse(product_ids)
|
|
with Transaction().set_context(context):
|
|
products = Product.get_quantity(products, 'quantity')
|
|
p_forecast = {k: value for k, value in products.items()
|
|
if value}
|
|
|
|
ret = []
|
|
for pl in products_with_limit:
|
|
if pl.product.id not in p_forecast:
|
|
continue
|
|
item = pl.product.rec_name
|
|
assigned = Uom.compute_qty(pl.uom, pl.quantity,
|
|
pl.product.default_uom)
|
|
stock = p_forecast[pl.product.id]
|
|
diff = assigned - stock
|
|
unit = pl.product.default_uom
|
|
ret.append({
|
|
'item': item,
|
|
'assigned': assigned,
|
|
'stock': stock,
|
|
'diff': diff,
|
|
'unit': unit
|
|
})
|
|
return ret
|
|
|
|
|
|
class ShipmentOut(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out'
|
|
|
|
def product_limits_by_location(self):
|
|
ProductLimit = Pool().get('stock.location.product_limit')
|
|
warehouse = self.get_party_warehouse_used(
|
|
**self._get_party_warehouse_pattern())
|
|
|
|
return ProductLimit.product_limits_by_location(
|
|
warehouse.id)
|
|
|
|
@classmethod
|
|
def wait(cls, shipments):
|
|
# TODO: implement with wizard in a new method 'wait_try'
|
|
# similar to 'assign_try'.
|
|
super().wait(shipments)
|
|
|
|
pool = Pool()
|
|
ProductLimit = pool.get('stock.location.product_limit')
|
|
Uom = pool.get('product.uom')
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
cache = set()
|
|
|
|
for shipment in shipments:
|
|
location = shipment.get_party_warehouse_used(
|
|
**shipment._get_party_warehouse_pattern())
|
|
if not location:
|
|
continue
|
|
for move in shipment.outgoing_moves:
|
|
if (move.product, move.quantity) in cache:
|
|
continue
|
|
pls = ProductLimit.search([
|
|
('product', '=', move.product),
|
|
('location', '=', location)])
|
|
if len(pls) > 0:
|
|
limit = pls[0].quantity
|
|
uom = pls[0].uom
|
|
context = Transaction().context
|
|
|
|
context['product'] = move.product.id
|
|
with Transaction().set_context(context):
|
|
forecast_qty = (pls[0].location.get_quantity(
|
|
[pls[0].location], 'forecast')[pls[0].location.id]
|
|
+ Uom.compute_qty(move.uom, move.quantity,
|
|
move.product.default_uom)
|
|
)
|
|
if forecast_qty > Uom.compute_qty(
|
|
uom, limit, move.product.default_uom):
|
|
warning_name = 'product_limit_exceeded_%s_%s' % (
|
|
shipment.id, move.product.id)
|
|
if Warning.check(warning_name):
|
|
raise UserWarning(warning_name, gettext(
|
|
'stock_location_product_limit.'
|
|
'msg_stock_shipment_out_product_limit_exceeded',
|
|
product=move.product.name))
|
|
cache.add((move.product, move.quantity))
|
|
|
|
|
|
class ShipmentOutReturn(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out.return'
|
|
|
|
def product_limits_by_location(self):
|
|
ProductLimit = Pool().get('stock.location.product_limit')
|
|
warehouse = self.get_party_warehouse_used(
|
|
**self._get_party_warehouse_pattern())
|
|
return ProductLimit.product_limits_by_location(
|
|
warehouse.id)
|
|
|
|
|
|
class ShipmentInternal(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.internal'
|
|
|
|
def product_limits_by_location(self):
|
|
ProductLimit = Pool().get('stock.location.product_limit')
|
|
warehouse = self.get_party_warehouse_used(
|
|
**self._get_party_warehouse_pattern())
|
|
if not warehouse:
|
|
return []
|
|
return ProductLimit.product_limits_by_location(warehouse.id)
|
|
|
|
@classmethod
|
|
def wait(cls, shipments):
|
|
super().wait(shipments)
|
|
|
|
pool = Pool()
|
|
ProductLimit = pool.get('stock.location.product_limit')
|
|
Uom = pool.get('product.uom')
|
|
Warning = pool.get('res.user.warning')
|
|
cache = set()
|
|
|
|
for shipment in shipments:
|
|
location = shipment.get_party_warehouse_used(
|
|
**shipment._get_party_warehouse_pattern())
|
|
if not location:
|
|
continue
|
|
for move in shipment.moves:
|
|
if move.to_location.warehouse != location:
|
|
continue
|
|
if (move.product, move.quantity) in cache:
|
|
continue
|
|
pls = ProductLimit.search([
|
|
('product', '=', move.product),
|
|
('location', '=', location)])
|
|
if len(pls) > 0:
|
|
limit = pls[0].quantity
|
|
uom = pls[0].uom
|
|
context = Transaction().context
|
|
|
|
context['product'] = move.product.id
|
|
with Transaction().set_context(context):
|
|
forecast_qty = (pls[0].location.get_quantity(
|
|
[pls[0].location], 'quantity')[pls[0].location.id]
|
|
+ Uom.compute_qty(move.uom, move.quantity,
|
|
move.product.default_uom)
|
|
)
|
|
if forecast_qty > Uom.compute_qty(
|
|
uom, limit, move.product.default_uom):
|
|
warning_name = 'product_limit_exceeded_%s_%s' % (
|
|
shipment.id, move.product.id)
|
|
if Warning.check(warning_name):
|
|
raise UserWarning(warning_name, gettext(
|
|
'stock_location_product_limit.'
|
|
'msg_stock_shipment_out_product_limit_exceeded',
|
|
product=move.product.name))
|
|
cache.add((move.product, move.quantity))
|
|
|
|
|
|
class Location(metaclass=PoolMeta):
|
|
__name__ = 'stock.location'
|
|
|
|
limits = fields.One2Many('stock.location.product_limit', 'location',
|
|
'Limits')
|
|
limit_quantity = fields.Function(
|
|
fields.Float('Limit quantity'), 'get_limit_quantity')
|
|
diff_quantity = fields.Function(
|
|
fields.Float('Diff. quantity'), 'get_diff_quantity')
|
|
|
|
@classmethod
|
|
def get_limit_quantity(cls, locations, name):
|
|
pool = Pool()
|
|
Uom = pool.get('product.uom')
|
|
Product = pool.get('product.product')
|
|
|
|
product_id = Transaction().context.get('product')
|
|
res = dict([(l.id, 0) for l in locations])
|
|
if not product_id or not isinstance(product_id, int):
|
|
return res
|
|
|
|
default_uom = Product(product_id).default_uom
|
|
for location in locations:
|
|
if not location.limits:
|
|
continue
|
|
res[location.id] = sum(Uom.compute_qty(
|
|
l.uom, l.quantity, default_uom) for l in location.limits
|
|
if l.product.id == product_id)
|
|
return res
|
|
|
|
@classmethod
|
|
def get_diff_quantity(cls, locations, name):
|
|
return dict([(l.id, l.limit_quantity
|
|
- (l.quantity or 0)) for l in locations])
|
|
|
|
|
|
class ProductLimitNote(Report):
|
|
"""Location Product limit note"""
|
|
__name__ = 'stock.location.product_limit.note'
|
|
|
|
@classmethod
|
|
def get_context(cls, records, header, data):
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
Company = pool.get('company.company')
|
|
Location = pool.get('stock.location')
|
|
ProductLimit = pool.get('stock.location.product_limit')
|
|
report_context = super().get_context(records, header, data)
|
|
|
|
report_context['company'] = Company(data['company'])
|
|
report_context['product'] = Product(data['product'])
|
|
report_context['location'] = Location(data['location'])
|
|
|
|
def get_stock_context(start_date, end_date):
|
|
return {
|
|
'stock_date_start': start_date,
|
|
'stock_date_end': end_date,
|
|
'forecast': True}
|
|
|
|
location_id = data['location']
|
|
product_id = data['product']
|
|
with Transaction().set_context(**get_stock_context(
|
|
None, data['start_date'] + relativedelta(days=-1))):
|
|
stock = Product.products_by_location([location_id],
|
|
with_childs=True, grouping_filter=([product_id], )).get(
|
|
(location_id, product_id), 0)
|
|
with Transaction().set_context(
|
|
**get_stock_context(data['start_date'], data['end_date'])):
|
|
qties = Product.products_by_location([location_id],
|
|
with_childs=True,
|
|
grouping=('product', 'effective_date', 'origin', 'shipment'),
|
|
grouping_filter=([product_id], )
|
|
)
|
|
|
|
values = {(data['start_date'] + relativedelta(days=-1), None): stock}
|
|
for key, qty in qties.items():
|
|
_origin = key[3] or key[4]
|
|
if _origin:
|
|
model, id = _origin.split(',')
|
|
_origin_value = pool.get(model)(id)
|
|
else:
|
|
_origin_value = None
|
|
values.setdefault((key[2], _origin_value), 0)
|
|
values[(key[2], _origin_value)] += qty
|
|
|
|
report_context['moves'] = cls._get_sorted_moves(values)
|
|
report_context['models'] = cls.get_models(report_context['moves'])
|
|
report_context['limit'] = ProductLimit.product_limits_by_location(
|
|
location_id, product_ids=[product_id], at_date=data['end_date'])
|
|
cumulate = 0
|
|
cumulate_moves = {}
|
|
for k, v in report_context['moves']:
|
|
cumulate_moves.setdefault(k[0], cumulate)
|
|
cumulate_moves[k[0]] += v
|
|
cumulate += v
|
|
report_context['cumulate'] = sorted([(k, v) for k, v in
|
|
cumulate_moves.items()], key=lambda x: x[0] or date.min)
|
|
return report_context
|
|
|
|
@classmethod
|
|
def _get_sorted_moves(cls, moves):
|
|
new_moves = [(k, v) for k, v in moves.items()]
|
|
new_moves = sorted(new_moves, key=lambda x: x[0][0] or date.min)
|
|
return new_moves
|
|
|
|
@classmethod
|
|
def get_models(cls, moves):
|
|
IrModel = Pool().get('ir.model')
|
|
models = [m[0][1].__name__ if m[0][1] else None for m in moves]
|
|
models = IrModel.search([
|
|
('model', 'in', models),
|
|
])
|
|
res = {None: ''}
|
|
res.update({m.model: m.name for m in models})
|
|
return res
|
|
|
|
|
|
class PrintProductLimitNoteParam(ModelView):
|
|
"""Print location product limit note param"""
|
|
__name__ = 'stock.location.product_limit.note_print.params'
|
|
|
|
start_date = fields.Date('Start date', required=True)
|
|
end_date = fields.Date('End date', required=True)
|
|
product = fields.Many2One('product.product', 'Product', required=True)
|
|
location = fields.Many2One('stock.location', 'Location', required=True,
|
|
domain=[('limits', '!=', None)])
|
|
|
|
|
|
class PrintProductLimitNote(Wizard):
|
|
"""Print Location product limit note"""
|
|
__name__ = 'stock.location.product_limit.note_print'
|
|
|
|
start = StateTransition()
|
|
params = StateView('stock.location.product_limit.note_print.params',
|
|
'stock_location_product_limit.print_params_view_form',
|
|
[Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('Print', 'print_', 'tryton-print', default=True)])
|
|
print_ = StateReport('stock.location.product_limit.note')
|
|
|
|
def transition_start(self):
|
|
return 'params'
|
|
|
|
def do_print_(self, action):
|
|
data = {}
|
|
|
|
if Transaction().context.get('active_ids'):
|
|
_ = Transaction().context['active_ids'].pop()
|
|
data['company'] = Transaction().context['company']
|
|
data['start_date'] = self.params.start_date
|
|
data['end_date'] = self.params.end_date
|
|
data['product'] = self.params.product.id
|
|
data['location'] = self.params.location.id
|
|
return action, data
|
|
|
|
|
|
class ReportLimitMixin(object):
|
|
|
|
@classmethod
|
|
def get_context(cls, records, header, data):
|
|
Conf = Pool().get('stock.configuration')
|
|
|
|
report_context = super().get_context(records, header, data)
|
|
|
|
show_limit = Conf(1).show_limit
|
|
report_context['product_limits'] = {}
|
|
for r in records:
|
|
values = []
|
|
if show_limit:
|
|
values = r.product_limits_by_location()
|
|
report_context['product_limits'][r.id] = values
|
|
return report_context
|
|
|
|
|
|
class DeliveryNote(ReportLimitMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out.delivery_note'
|
|
|
|
|
|
class RestockingList(ReportLimitMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out.return.restocking_list'
|
|
|
|
|
|
class InternalReport(ReportLimitMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.internal.report'
|
|
|
|
|
|
class Configuration(metaclass=PoolMeta):
|
|
__name__ = 'stock.configuration'
|
|
|
|
show_limit = fields.Boolean('Show product limits', help=(
|
|
'If checked a summary of product limits is shown in shipment reports.')
|
|
)
|
|
|
|
@staticmethod
|
|
def default_show_limit():
|
|
return True
|