trytond-stock_last_price/stock.py

140 lines
5.1 KiB
Python

# The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from sql import Column
from sql.aggregate import Max
from sql.conditionals import Coalesce
from sql.operators import Concat
from trytond.model import fields
from trytond.pool import PoolMeta, Pool
from trytond.pyson import Eval
from trytond.transaction import Transaction
__all__ = ['Move', 'ShipmentIn', 'ShipmentOut']
class Move:
__name__ = 'stock.move'
__metaclass__ = PoolMeta
last_prices = fields.Function(
fields.Many2Many('stock.move', None, None, 'Last move prices',
domain=[('product', '=', Eval('product'))],
depends=['product']),
'get_last_prices')
@fields.depends('product', 'shipment', 'effective_date', 'planned_date')
def on_change_with_last_prices(self):
return self.get_last_prices([self])[self.id]
@classmethod
def get_last_prices(cls, records, name=None):
pool = Pool()
Date = pool.get('ir.date')
move = cls.__table__()
cursor = Transaction().connection.cursor()
today = Date.today()
res = {r.id: [] for r in records}
for record in records:
if not record.product or not record.shipment:
continue
_shipment_name = record.shipment.__name__
Shipment = pool.get(_shipment_name)
shipment = Shipment.__table__()
_party = getattr(record.shipment,
Shipment._get_last_price_party_fieldname(), None)
if not _party:
continue
_date = (record.effective_date or record.planned_date or
record.shipment.effective_date or
record.shipment.planned_date or today)
cursor.execute(*shipment.join(
move, condition=(
move.shipment == Concat('%s,' % _shipment_name,
shipment.id))
).select(Max(move.id),
where=((move.product == record.product.id) &
(move.shipment != '%s,%s' % (_shipment_name,
record.shipment.id)) &
(move.state != 'cancelled') &
(Column(shipment,
Shipment._get_last_price_party_fieldname()) ==
_party.id) &
(Coalesce(shipment.effective_date,
shipment.planned_date) <= _date)),
order_by=(move.product, shipment.id),
group_by=(move.product, shipment.id),
limit=3)
)
res[record.id] = [row[0] for row in cursor.fetchall()] or []
return res
class ShipmentLastPriceMixin(object):
last_prices = fields.Function(
fields.Many2Many('stock.move', None, 'Last move prices'),
'get_last_prices')
@classmethod
def get_last_prices(cls, records, name=None):
pool = Pool()
Move = pool.get('stock.move')
move = Move.__table__()
active_move = Move.__table__()
shipment = cls.__table__()
cursor = Transaction().connection.cursor()
res = {r.id: [] for r in records}
for record in records:
_products = set([m.product.id for m in record.outgoing_moves])
party = getattr(record,
cls._get_last_price_party_fieldname(), None)
if not party or not _products:
continue
cursor.execute(*shipment.join(move, condition=(
move.shipment == Concat('%s,' % cls.__name__, shipment.id))
).join(active_move, condition=(
active_move.product == move.product)
).select(Max(move.id),
where=((active_move.shipment ==
Concat('%s,' % cls.__name__, record.id)) &
(shipment.id != record.id) &
(shipment.state != 'cancelled') &
(Column(shipment,
cls._get_last_price_party_fieldname()) ==
party.id) &
(Coalesce(shipment.effective_date,
shipment.planned_date) <= Coalesce(
record.effective_date,
record.planned_date))),
order_by=(move.product, shipment.id),
group_by=(move.product, shipment.id),
limit=len(_products) * 3)
)
res[record.id] = [row[0] for row in cursor.fetchall()] or []
return res
@classmethod
def _get_last_price_party_fieldname(cls):
raise NotImplementedError()
class ShipmentOut(ShipmentLastPriceMixin):
__name__ = 'stock.shipment.out'
__metaclass__ = PoolMeta
@classmethod
def _get_last_price_party_fieldname(cls):
return 'customer'
class ShipmentIn(ShipmentLastPriceMixin):
__name__ = 'stock.shipment.in'
__metaclass__ = PoolMeta
@classmethod
def _get_last_price_party_fieldname(cls):
return 'supplier'