mirror of
https://gitlab.com/datalifeit/trytond-stock_distribute
synced 2023-12-14 05:02:53 +01:00
264 lines
12 KiB
Python
264 lines
12 KiB
Python
# The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
from trytond.model import fields, ModelView
|
|
from trytond.pool import PoolMeta, Pool
|
|
from trytond.pyson import Eval
|
|
from trytond.transaction import Transaction
|
|
|
|
__all__ = ['Move', 'Location']
|
|
|
|
__metaclass__ = PoolMeta
|
|
|
|
|
|
class Move:
|
|
__name__ = 'stock.move'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Move, cls).__setup__()
|
|
cls._error_messages.update(
|
|
{'cannot_distribute': 'Cannot distribute movements along Locations. '
|
|
'Please revise location sequences configuration and space availability.'})
|
|
cls._buttons.update(
|
|
{'distribute': {'invisible': ~Eval('state').in_(['draft'])}})
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
def distribute(cls, moves, order='ascendant'):
|
|
pool = Pool()
|
|
_Move = pool.get('stock.move')
|
|
temp_moves = []
|
|
cls.cancel(moves)
|
|
for m in moves:
|
|
_move = _Move()
|
|
for _field in _Move._fields:
|
|
if _field in ('create_date', 'id', 'create_uid', 'state'
|
|
'write_date', 'write_uid', 'rec_name'):
|
|
continue
|
|
setattr(_move, _field, getattr(m, _field, None))
|
|
temp_moves.append(_move)
|
|
new_moves = cls._distribute(temp_moves, order)
|
|
if new_moves != moves:
|
|
cls.delete(moves)
|
|
cls.save(new_moves)
|
|
|
|
@classmethod
|
|
def _distribute(cls, moves, order='ascendant'):
|
|
"""
|
|
Distributes given movements along locations
|
|
in order to not overload storage space
|
|
|
|
moves is a list of movements. They can be existing movements or
|
|
new ones that will be persisted.
|
|
order determines the filling order based on location sequence field
|
|
|
|
Returns new list of movements
|
|
"""
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
_Move = pool.get('stock.move')
|
|
|
|
if not moves:
|
|
return []
|
|
|
|
new_moves = []
|
|
to_review = cls._get_locations_to_check_space(moves)
|
|
for key, extra_space in to_review.iteritems():
|
|
loc = key[0]
|
|
date = key[1]
|
|
for m in moves:
|
|
if not m.to_location.id == loc:
|
|
continue
|
|
if getattr(m, 'effective_date', None) and getattr(m, 'effective_date', None) != date:
|
|
continue
|
|
if m.planned_date and not m.planned_date == date:
|
|
continue
|
|
if not m.product.occupy_space:
|
|
new_moves.append(m)
|
|
continue
|
|
|
|
av_locations = Location.get_next_available_locations(m.to_location,
|
|
date,
|
|
m.product.get_space(m.quantity, m.uom),
|
|
m.product.length,
|
|
order)
|
|
if not av_locations:
|
|
cls.raise_user_error('cannot_distribute', {})
|
|
assigned_qty = 0
|
|
for item in av_locations:
|
|
for av_key, av_space in item.iteritems():
|
|
if not av_key:
|
|
cls.raise_user_error('cannot_distribute', {})
|
|
location = Location(av_key)
|
|
space = m.product.get_space((m.quantity - assigned_qty), m.uom)
|
|
qty = m.product.get_quantity_from_space(av_space if av_space < space else space)
|
|
if not qty or qty < 0:
|
|
continue
|
|
move = _Move()
|
|
for _field in _Move._fields:
|
|
if _field in ('create_date', 'id', 'create_uid',
|
|
'write_date', 'write_uid', 'rec_name', 'state'):
|
|
continue
|
|
setattr(move, _field, getattr(m, _field, None))
|
|
move.to_location = location
|
|
move.quantity = qty
|
|
new_moves.append(move)
|
|
assigned_qty += qty
|
|
|
|
if m.quantity != assigned_qty:
|
|
new_moves[len(new_moves) - 1].quantity += (m.quantity - assigned_qty)
|
|
if not new_moves:
|
|
return moves
|
|
return new_moves
|
|
|
|
|
|
class Location:
|
|
__name__ = 'stock.location'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Location, cls).__setup__()
|
|
if cls.sequence.depends:
|
|
if 'parent' not in cls.sequence.depends:
|
|
cls.sequence.depends.append('parent')
|
|
if 'type' not in cls.sequence.depends:
|
|
cls.sequence.depends.append('type')
|
|
else:
|
|
cls.sequence.depends = ['parent', 'type']
|
|
cls._error_messages.update({
|
|
'duplicated_sequence': 'Sequence number %(sequence)s must be unique among '
|
|
'childs of type %(type)s of location %(location)s.',
|
|
'no_next_location': 'Next locations to fill after "%s" cannot be obtained. '
|
|
'Please check locations sequences configuration.\nIf you decided '
|
|
'to continue anyway, exceeded quantity will be storage in "%s".'})
|
|
|
|
@classmethod
|
|
def validate(cls, locations):
|
|
super(Location, cls).validate(locations)
|
|
for location in locations:
|
|
location.check_sequence()
|
|
|
|
@fields.depends('sequence', 'parent', 'type')
|
|
def on_change_with_sequence(self):
|
|
if self.sequence:
|
|
return self.sequence
|
|
if not self.parent:
|
|
return None
|
|
return self.parent.get_next_sequence()
|
|
|
|
def get_next_sequence(self):
|
|
max_seq = max([0, ] + [c.sequence for c in self.childs if c.sequence])
|
|
return max_seq + 1
|
|
|
|
def check_sequence(self):
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
|
|
if not self.parent or not self.sequence:
|
|
return
|
|
location = Location.search([('parent', '=', self.parent.id),
|
|
('id', '!=', self.id),
|
|
('type', '=', self.type),
|
|
('sequence', '=', self.sequence)])
|
|
if location:
|
|
self.raise_user_error('duplicated_sequence',
|
|
{'location': self.parent.rec_name,
|
|
'type': self.type,
|
|
'sequence': self.sequence})
|
|
|
|
@classmethod
|
|
def get_next_available_locations(cls, start_location,
|
|
date, needed_space,
|
|
granularity,
|
|
order='ascendant'):
|
|
""" Collects locations necessary to complete space needed.
|
|
|
|
start_location determines where to start to fill.
|
|
date is the date to calculate stock forecast.
|
|
needed_space is the quantity of space needed.
|
|
granularity is the precision for space measurement. It depends on product measurements
|
|
order determines the filling order based on location sequence field
|
|
|
|
Returns a list of dictionaries with location id as key and quantity that can be stored in it.
|
|
"""
|
|
pool = Pool()
|
|
Location = pool.get('stock.location')
|
|
|
|
# when is view or warehouse, starts in a child
|
|
if start_location.type in ['view', 'warehouse']:
|
|
if not start_location.childs:
|
|
return []
|
|
storage_childs = [c for c in start_location.childs if c.type == 'storage' and c.sequence]
|
|
if not storage_childs:
|
|
return []
|
|
if order == 'ascendant':
|
|
seq = min(c.sequence for c in storage_childs)
|
|
else:
|
|
seq = max(c.sequence for c in storage_childs)
|
|
|
|
storage, = [c for c in storage_childs if c.sequence == seq]
|
|
return cls.get_next_available_locations(storage, date, needed_space, granularity, order)
|
|
|
|
if not start_location.control_space:
|
|
return [{start_location.id: needed_space}]
|
|
|
|
with Transaction().set_context(stock_date_end=date):
|
|
av_space = start_location.get_available_space()
|
|
if av_space >= needed_space:
|
|
return [{start_location.id: needed_space}]
|
|
if not start_location.parent:
|
|
cls.raise_user_warning('%s.no_next_location' % start_location.id,
|
|
'no_next_location',
|
|
(start_location.rec_name, start_location.rec_name))
|
|
return {start_location.id: needed_space}
|
|
|
|
# find next locations to fill
|
|
result = [{start_location.id: av_space}]
|
|
assigned_space = av_space
|
|
location_domain = [('parent', '=', start_location.parent.id),
|
|
('sequence', '!=', None),
|
|
('sequence', '>' if order == 'ascendant' else '<', start_location.sequence)]
|
|
|
|
locations = Location.search(location_domain,
|
|
order=[('sequence', 'ASC' if order == 'ascendant' else 'DESC')])
|
|
|
|
for l in locations:
|
|
if needed_space == assigned_space:
|
|
return result
|
|
with Transaction().set_context(stock_date_end=date):
|
|
av_space = int(l.get_available_space()/granularity) * granularity
|
|
if av_space >= (needed_space - assigned_space) > 0:
|
|
space = (needed_space - assigned_space)
|
|
else:
|
|
space = av_space
|
|
result.append({l.id: space})
|
|
assigned_space += space
|
|
|
|
# Goes up in location hierarchy to continue filling
|
|
if needed_space > assigned_space:
|
|
if not getattr(start_location.parent, 'parent', None):
|
|
result.append({None: needed_space - assigned_space})
|
|
return result
|
|
location_parent = start_location.parent.parent
|
|
location_domain = [('parent', '=', location_parent.id),
|
|
('sequence', '!=', None),
|
|
('sequence', '>' if order == 'ascendant' else '<', start_location.parent.sequence)]
|
|
last_location = locations[len(locations)-1] if locations else start_location
|
|
locations = Location.search(location_domain,
|
|
order=[('sequence', 'ASC' if order == 'ascendant' else 'DESC')],
|
|
limit=1)
|
|
if not locations:
|
|
cls.raise_user_warning('%s.no_next_location' % start_location.id,
|
|
'no_next_location',
|
|
(last_location.rec_name, last_location.rec_name))
|
|
result[len(result) - 1][last_location.id] = (
|
|
result[len(result) - 1][last_location.id] + (needed_space - assigned_space))
|
|
return result
|
|
|
|
# get next children locations from the following parent child location
|
|
result.extend(cls.get_next_available_locations(locations[0], date,
|
|
(needed_space - assigned_space),
|
|
granularity,
|
|
order))
|
|
|
|
return result |