421 lines
14 KiB
Python
421 lines
14 KiB
Python
# The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
from functools import wraps
|
|
from dateutil.relativedelta import relativedelta
|
|
from trytond.exceptions import UserError
|
|
from trytond.i18n import gettext
|
|
from trytond.model import fields
|
|
from trytond.pool import PoolMeta, Pool
|
|
from trytond.pyson import Eval, Not, Bool, And
|
|
from trytond.transaction import Transaction
|
|
|
|
__all__ = ['ShipmentOut', 'ShipmentInternal', 'ShipmentOutReturn',
|
|
'ShipmentInReturn']
|
|
|
|
|
|
class ShipmentUnitLoadMixin(object):
|
|
|
|
unit_loads = fields.Function(
|
|
fields.One2Many('stock.unit_load', None, 'Unit loads',
|
|
states={
|
|
'readonly': Eval('state') != 'draft',
|
|
'invisible': And(
|
|
Eval('state') != 'draft',
|
|
Not(Bool(Eval('unit_loads', []))))
|
|
},
|
|
depends=['state']),
|
|
'get_unit_loads', setter='set_unit_loads',
|
|
searcher='search_unit_loads')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
for _field_name in cls._get_ul_readonly_fields_name():
|
|
_field = getattr(cls, _field_name)
|
|
if _field.states.get('readonly'):
|
|
_field.states['readonly'] |= Eval('unit_loads')
|
|
else:
|
|
_field.states['readonly'] = Eval('unit_loads')
|
|
if 'unit_loads' not in _field.depends:
|
|
_field.depends.append('unit_loads')
|
|
|
|
def get_unit_loads(self, name=None):
|
|
pass
|
|
|
|
@classmethod
|
|
def set_unit_loads(cls, records, name, value):
|
|
pass
|
|
|
|
@classmethod
|
|
def search_unit_loads(cls, name, clause):
|
|
_field = 'moves.unit_load'
|
|
if '.' in name:
|
|
_field += '.%s' % name[10:]
|
|
return [(_field, ) + tuple(clause[1:])]
|
|
|
|
@classmethod
|
|
def _get_ul_readonly_fields_name(cls):
|
|
return []
|
|
|
|
|
|
class AtWarehouseMixin(object):
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
add_remove_ = [
|
|
('at_warehouse', '=', Eval('warehouse', None)),
|
|
('available', '=', True),
|
|
('production_state', '=', 'done'),
|
|
('state', '=', 'done')
|
|
]
|
|
if cls.unit_loads.add_remove:
|
|
cls.unit_loads.add_remove += add_remove_
|
|
else:
|
|
cls.unit_loads.add_remove = add_remove_
|
|
cls.unit_loads.depends.append('warehouse')
|
|
|
|
|
|
class ShipmentOut(ShipmentUnitLoadMixin, AtWarehouseMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.unit_loads.states['readonly'] |= Not(Bool(Eval('warehouse')))
|
|
|
|
def get_unit_loads(self, name=None):
|
|
if not self.outgoing_moves:
|
|
return []
|
|
uls = set(m.unit_load.id for m in self.outgoing_moves if m.unit_load)
|
|
return list(uls)
|
|
|
|
@fields.depends('unit_loads', 'outgoing_moves', 'warehouse_output',
|
|
'customer_location', 'end_date')
|
|
def on_change_unit_loads(self):
|
|
moves = []
|
|
ul_ids = [ul.id for ul in self.unit_loads]
|
|
if self.outgoing_moves:
|
|
moves.extend([
|
|
m for m in self.outgoing_moves
|
|
if m.unit_load and m.unit_load.id in ul_ids])
|
|
for ul in self.unit_loads:
|
|
if ul.id in [m.unit_load.id for m in moves]:
|
|
continue
|
|
new_moves = ul._move(self.customer_location,
|
|
self.end_date,
|
|
from_location=self.warehouse_output)
|
|
moves.extend(new_moves)
|
|
self.outgoing_moves = moves
|
|
|
|
def _get_inventory_move(self, move):
|
|
res = super(ShipmentOut, self)._get_inventory_move(move)
|
|
if res and move.unit_load:
|
|
location = move.unit_load.get_location([move.unit_load],
|
|
type='storage')[move.unit_load.id]
|
|
res.from_location = location
|
|
res.unit_load = move.unit_load
|
|
return res
|
|
|
|
@classmethod
|
|
def _get_ul_readonly_fields_name(cls):
|
|
return ['inventory_moves', 'outgoing_moves']
|
|
|
|
@classmethod
|
|
def _sync_inventory_to_outgoing(cls, shipments, quantity=True):
|
|
shipments = [s for s in shipments if not s.unit_loads]
|
|
super()._sync_inventory_to_outgoing(shipments, quantity=quantity)
|
|
|
|
|
|
def set_unit_load_warehouse(func):
|
|
@wraps(func)
|
|
def wrapper(cls, records):
|
|
pool = Pool()
|
|
UnitLoad = pool.get('stock.unit_load')
|
|
|
|
uls_to_save = list(set([ul for record in records
|
|
for ul in record.unit_loads]))
|
|
|
|
UnitLoad.check_last_moves_dates(uls_to_save)
|
|
func(cls, records)
|
|
|
|
if uls_to_save:
|
|
UnitLoad.set_at_warehouse(uls_to_save)
|
|
return wrapper
|
|
|
|
|
|
class ShipmentInternal(ShipmentUnitLoadMixin, AtWarehouseMixin,
|
|
metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.internal'
|
|
|
|
ul_quantity = fields.Function(
|
|
fields.Float('ULs', digits=(16, 0)), 'get_ul_quantity')
|
|
|
|
def get_ul_quantity(self, name=None):
|
|
if not self.unit_loads:
|
|
return None
|
|
return len(self.unit_loads)
|
|
|
|
def get_unit_loads(self, name=None):
|
|
if not self.moves:
|
|
return []
|
|
uls = set(m.unit_load.id for m in self.moves if m.unit_load)
|
|
return list(uls)
|
|
|
|
@fields.depends('unit_loads', 'moves', 'date_time_', 'from_location',
|
|
'to_location')
|
|
def on_change_unit_loads(self):
|
|
moves = []
|
|
ul_ids = [ul.id for ul in self.unit_loads]
|
|
if self.moves:
|
|
moves.extend(
|
|
[m for m in self.moves if m.unit_load.id in ul_ids])
|
|
for ul in self.unit_loads:
|
|
if ul.id in [m.unit_load.id for m in moves]:
|
|
continue
|
|
new_moves = ul._move(self.to_location,
|
|
self.date_time_,
|
|
from_location=self.from_location)
|
|
moves.extend(new_moves)
|
|
self.moves = moves
|
|
|
|
@classmethod
|
|
def _get_ul_readonly_fields_name(cls):
|
|
return ['moves', 'outgoing_moves', 'incoming_moves']
|
|
|
|
@classmethod
|
|
def wait(cls, records, moves=None):
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
Move = pool.get('stock.move')
|
|
UnitLoad = pool.get('stock.unit_load')
|
|
|
|
# fix from location in UL moves
|
|
ul_moves = []
|
|
for record in records:
|
|
if record.state != 'draft':
|
|
continue
|
|
ul_locations = {}
|
|
if record.unit_loads:
|
|
# get current location one microsecond before this shipment
|
|
ul_locations.update(UnitLoad.get_location(
|
|
list(record.unit_loads),
|
|
at_date=(record.date_time_
|
|
- relativedelta(microseconds=1)),
|
|
type='storage',
|
|
move_states=['assigned', 'done']
|
|
))
|
|
# check locations are child of shipment from location
|
|
cursor = Transaction().connection.cursor()
|
|
cursor.execute(*Location.search([
|
|
('parent', 'child_of', record.from_location),
|
|
('id', 'in', list(ul_locations.values()))], query=True)
|
|
)
|
|
locations = [r[0] for r in cursor.fetchall()]
|
|
for unit_load_id, location_id in ul_locations.items():
|
|
if location_id not in locations:
|
|
raise UserError(gettext('stock_unit_load.'
|
|
'msg_shipment_internal_wrong_ul_from_location',
|
|
unit_load=UnitLoad(unit_load_id).rec_name,
|
|
from_location=record.from_location.rec_name,
|
|
location=Location(location_id).rec_name))
|
|
|
|
for move in record.moves:
|
|
if not move.unit_load:
|
|
continue
|
|
move.from_location = ul_locations[move.unit_load.id]
|
|
ul_moves.append(move)
|
|
|
|
if ul_moves:
|
|
Move.save(ul_moves)
|
|
super().wait(records, moves=moves)
|
|
|
|
@classmethod
|
|
def ship(cls, records):
|
|
pool = Pool()
|
|
UnitLoad = pool.get('stock.unit_load')
|
|
|
|
uls = list(set([ul for record in records for ul in record.unit_loads]))
|
|
UnitLoad.check_last_moves_dates(uls)
|
|
super().ship(records)
|
|
|
|
@classmethod
|
|
@set_unit_load_warehouse
|
|
def done(cls, records):
|
|
super().done(records)
|
|
|
|
@classmethod
|
|
@set_unit_load_warehouse
|
|
def cancel(cls, records):
|
|
super().cancel(records)
|
|
|
|
|
|
class ShipmentOutReturn(ShipmentUnitLoadMixin, metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out.return'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
cls.unit_loads.states['readonly'] |= Not(Bool(Eval('warehouse')))
|
|
cls.unit_loads.depends.append('warehouse')
|
|
|
|
def get_unit_loads(self, name=None):
|
|
if not self.incoming_moves:
|
|
return []
|
|
uls = set(m.unit_load.id for m in self.incoming_moves if m.unit_load)
|
|
return list(uls)
|
|
|
|
@fields.depends('unit_loads', 'incoming_moves', 'effective_date',
|
|
'planned_date', 'start_time', 'warehouse_input', 'customer_location')
|
|
def on_change_unit_loads(self):
|
|
moves = []
|
|
ul_ids = [ul.id for ul in self.unit_loads]
|
|
date_time_ = datetime.datetime.combine(self.effective_date or
|
|
self.planned_date or datetime.date.today(), self.start_time or
|
|
datetime.datetime.now().time())
|
|
if self.incoming_moves:
|
|
moves.extend(
|
|
[m for m in self.incoming_moves if m.unit_load and
|
|
m.unit_load.id in ul_ids])
|
|
for ul in self.unit_loads:
|
|
if ul.id in [m.unit_load.id for m in moves]:
|
|
continue
|
|
new_moves = ul._move(self.warehouse_input, date_time_,
|
|
from_location=self.customer_location)
|
|
moves.extend(new_moves)
|
|
self.incoming_moves = moves
|
|
|
|
def _get_inventory_move(self, incoming_move):
|
|
move = super()._get_inventory_move(incoming_move)
|
|
if move and incoming_move.unit_load:
|
|
move.unit_load = incoming_move.unit_load
|
|
return move
|
|
|
|
@classmethod
|
|
def _get_ul_readonly_fields_name(cls):
|
|
return ['inventory_moves', 'incoming_moves']
|
|
|
|
@classmethod
|
|
@set_unit_load_warehouse
|
|
def done(cls, records):
|
|
super().done(records)
|
|
|
|
@classmethod
|
|
@set_unit_load_warehouse
|
|
def cancel(cls, records):
|
|
super().cancel(records)
|
|
|
|
|
|
class ShipmentInReturn(ShipmentUnitLoadMixin, AtWarehouseMixin,
|
|
metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.in.return'
|
|
|
|
warehouse = fields.Function(fields.Many2One('stock.location', 'Warehouse'),
|
|
'on_change_with_warehouse')
|
|
|
|
@fields.depends('from_location')
|
|
def on_change_with_warehouse(self, name=None):
|
|
if self.from_location and self.from_location.warehouse:
|
|
return self.from_location.warehouse.id
|
|
|
|
def get_unit_loads(self, name=None):
|
|
if not self.moves:
|
|
return []
|
|
uls = set(m.unit_load.id for m in self.moves if m.unit_load)
|
|
return list(uls)
|
|
|
|
@fields.depends('unit_loads', 'moves', 'from_location', 'to_location',
|
|
'end_date')
|
|
def on_change_unit_loads(self):
|
|
moves = []
|
|
ul_ids = [ul.id for ul in self.unit_loads]
|
|
if self.moves:
|
|
moves.extend(
|
|
[m for m in self.moves if m.unit_load.id in ul_ids])
|
|
for ul in self.unit_loads:
|
|
if ul.id in [m.unit_load.id for m in moves]:
|
|
continue
|
|
new_moves = ul._move(self.to_location,
|
|
self.end_date,
|
|
from_location=self.from_location)
|
|
moves.extend(new_moves)
|
|
self.moves = moves
|
|
|
|
@classmethod
|
|
def _get_ul_readonly_fields_name(cls):
|
|
return ['moves']
|
|
|
|
|
|
class Assign(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.assign'
|
|
|
|
def transition_start(self):
|
|
if getattr(self.record, 'unit_loads', None):
|
|
# if has unit loads, locations may not change
|
|
self.record.assign_force()
|
|
return 'end'
|
|
return super().transition_start()
|
|
|
|
|
|
class ShipmentInternalDone2Cancel(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.internal'
|
|
|
|
@classmethod
|
|
def cancel(cls, records):
|
|
for record in records:
|
|
record._check_cancel_ul_moves()
|
|
super().cancel(records)
|
|
|
|
def _check_cancel_ul_moves(self):
|
|
for unit_load in self.unit_loads:
|
|
# get from_location from move due to can be a child of
|
|
move, = [m for m in (self.incoming_moves or self.moves)
|
|
if m.unit_load == unit_load
|
|
and m.product == unit_load.product]
|
|
unit_load.check_to_move(
|
|
move.from_location,
|
|
self.to_location,
|
|
self.date_time_,
|
|
check_state=False)
|
|
|
|
|
|
class ShipmentOutDone2Cancel(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out'
|
|
|
|
@classmethod
|
|
def cancel(cls, records):
|
|
for record in records:
|
|
record._check_cancel_ul_moves()
|
|
super().cancel(records)
|
|
|
|
def _check_cancel_ul_moves(self):
|
|
from_location = self.warehouse_output
|
|
to_location = self.customer_location
|
|
for unit_load in self.unit_loads:
|
|
unit_load.check_to_move(
|
|
from_location,
|
|
to_location,
|
|
self.end_date,
|
|
check_state=False)
|
|
|
|
|
|
class ShipmentOutReturnDone2Cancel(metaclass=PoolMeta):
|
|
__name__ = 'stock.shipment.out.return'
|
|
|
|
@classmethod
|
|
def cancel(cls, records):
|
|
for record in records:
|
|
record._check_cancel_ul_moves()
|
|
super().cancel(records)
|
|
|
|
def _check_cancel_ul_moves(self):
|
|
from_location = self.customer_location
|
|
to_location = self.warehouse_storage
|
|
for unit_load in self.unit_loads:
|
|
unit_load.check_to_move(
|
|
from_location,
|
|
to_location,
|
|
self.end_date,
|
|
check_state=False)
|