449 lines
17 KiB
Python
449 lines
17 KiB
Python
# The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
import datetime
|
|
from sql import Literal
|
|
from sql.aggregate import Max
|
|
from sql.conditionals import Coalesce
|
|
from sql.functions import Now
|
|
from trytond.model import fields, ModelSQL, ModelView
|
|
from trytond.pool import PoolMeta, Pool
|
|
from trytond.pyson import Eval, Equal, Not, Bool, Id, And, PYSONEncoder
|
|
from trytond.transaction import Transaction
|
|
from trytond.wizard import Wizard, StateAction
|
|
|
|
__all__ = ['Configuration', 'Location', 'Move',
|
|
'LocationSpaceAvailability', 'OpenLocationSpaceAvailability']
|
|
|
|
__metaclass__ = PoolMeta
|
|
|
|
|
|
class Configuration:
|
|
__name__ = 'stock.configuration'
|
|
|
|
space_control = fields.Boolean('Space control enabled',
|
|
help='Enables storage space control in locations for products which occupies space')
|
|
|
|
@staticmethod
|
|
def default_space_control():
|
|
return True
|
|
|
|
|
|
class Location:
|
|
__name__ = 'stock.location'
|
|
|
|
control_space = fields.Boolean('Enable space control',
|
|
help='Enables storage space control',
|
|
states={'invisible': Not(Equal(Eval('type'), 'storage'))},
|
|
depends=['type'])
|
|
space_measure = fields.Selection([('length', 'Length'),
|
|
('surface', 'Surface')], 'Space measure',
|
|
states={'invisible': Not(Equal(Eval('type'), 'storage')),
|
|
'required': Bool(Eval('control_space'))},
|
|
depends=['type', 'control_space'])
|
|
overload_behavior = fields.Selection([('warn', 'Warn'),
|
|
('stop', 'Stop')],
|
|
'Overload behavior',
|
|
help='Determines system behavior when storage space is overloaded',
|
|
states={'required': Bool(Eval('control_space')),
|
|
'invisible': Not(Equal(Eval('type'), 'storage'))},
|
|
depends=['type', 'control_space'])
|
|
available_space = fields.Function(fields.Float('Available space at date'), 'get_available_space')
|
|
space = fields.Function(fields.Float('Space capacity',
|
|
digits=(16, Eval('space_unit_digits', 2)),
|
|
depends=['space_unit_digits']),
|
|
'get_space')
|
|
space_unit = fields.Function(
|
|
fields.Many2One('product.uom', 'Space UOM',
|
|
domain=[('category', 'in', [Id('product', 'uom_cat_surface'),
|
|
Id('product', 'uom_cat_length')])],
|
|
states={'invisible': Not(Equal(Eval('type'), 'storage')),
|
|
'required': Bool(Eval('control_space'))},
|
|
depends=['type', 'control_space']),
|
|
'on_change_with_space_unit')
|
|
space_unit_digits = fields.Function(fields.Integer('Space Digits'),
|
|
'on_change_with_space_unit_digits')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Location, cls).__setup__()
|
|
if cls.length.states.get('required'):
|
|
cls.length.states['required'] |= Bool(Eval('control_space'))
|
|
else:
|
|
cls.length.states['required'] = Bool(Eval('control_space'))
|
|
cls.length.depends.append('control_space')
|
|
if cls.width.states.get('required'):
|
|
cls.width.states['required'] |= And(Bool(Eval('control_space')), Equal(Eval('space_measure'), 'surface'))
|
|
else:
|
|
cls.width.states['required'] = And(Bool(Eval('control_space')), Equal(Eval('space_measure'), 'surface'))
|
|
cls.width.depends.extend(['control_space', 'space_measure'])
|
|
|
|
@staticmethod
|
|
def default_space_measure():
|
|
return 'length'
|
|
|
|
@staticmethod
|
|
def default_space_unit():
|
|
pool = Pool()
|
|
Modeldata = pool.get('ir.model.data')
|
|
res = Location.default_space_measure()
|
|
if res == 'length':
|
|
return Modeldata.get_id('product', 'uom_meter')
|
|
if res == 'surface':
|
|
return Modeldata.get_id('product', 'uom_square_meter')
|
|
|
|
@staticmethod
|
|
def default_control_space():
|
|
return False
|
|
|
|
@staticmethod
|
|
def default_overload_behavior():
|
|
return 'warn'
|
|
|
|
@fields.depends('height_uom')
|
|
def on_change_with_space_unit_digits(self, name=None):
|
|
return (self.space_unit.digits if self.space_unit
|
|
else self.default_space_unit_digits())
|
|
|
|
@staticmethod
|
|
def default_space_unit_digits():
|
|
return 2
|
|
|
|
@fields.depends('control_space')
|
|
def on_change_control_space(self):
|
|
pool = Pool()
|
|
Uom = pool.get('product.uom')
|
|
|
|
if not self.control_space:
|
|
return
|
|
self.space_measure = 'length'
|
|
self.overload_behavior = 'warn'
|
|
self.space_unit = Uom(self.on_change_with_space_unit())
|
|
|
|
@fields.depends('space_measure')
|
|
def on_change_with_space_unit(self, name=None, value=None):
|
|
pool = Pool()
|
|
Modeldata = pool.get('ir.model.data')
|
|
|
|
if not value:
|
|
value = self.space_measure
|
|
if value == 'length':
|
|
return Modeldata.get_id('product', 'uom_meter')
|
|
if value == 'surface':
|
|
return Modeldata.get_id('product', 'uom_square_meter')
|
|
return None
|
|
|
|
@classmethod
|
|
def write(cls, *args):
|
|
super(Location, cls).write(*args)
|
|
locations = sum(args[::2], [])
|
|
cls._set_space_measure(locations)
|
|
|
|
#todo: control children configuration
|
|
@classmethod
|
|
def _set_space_measure(cls, locations):
|
|
to_update = set()
|
|
for location in locations:
|
|
if location.parent and location.parent.space_measure != location.space_measure:
|
|
to_update.add(location.parent)
|
|
if to_update:
|
|
cls.write(list(to_update), {
|
|
'space_measure': location.space_measure})
|
|
to_update.clear()
|
|
|
|
def storage_try(self, date, extra_space=float(0)):
|
|
""" Determines if storing in location at date is possible"""
|
|
with Transaction().set_context(stock_date_end=date):
|
|
return (self.get_available_space() - extra_space) >= 0
|
|
|
|
def get_space(self, name=None):
|
|
""" Location space calculation"""
|
|
pool = Pool()
|
|
UOM = pool.get('product.uom')
|
|
ModelData = pool.get('ir.model.data')
|
|
meter = UOM(ModelData.get_id('product', 'uom_meter'))
|
|
square_meter = UOM(ModelData.get_id('product', 'uom_square_meter'))
|
|
|
|
if not self.control_space:
|
|
if self.type not in ['view', 'warehouse', 'storage']:
|
|
return 0
|
|
space = 0
|
|
for c in self.childs:
|
|
if self.type == 'warehouse' and c.id in [self.output_location.id, self.input_location.id]:
|
|
continue
|
|
space += c.get_space()
|
|
return space
|
|
value = UOM.compute_qty(self.length_uom, getattr(self, 'length', float(0)), meter)
|
|
if self.space_measure == 'surface':
|
|
other_value = UOM.compute_qty(self.width_uom, getattr(self, 'width', float(0)), meter)
|
|
value *= other_value if other_value else 1
|
|
value = UOM.round(value, square_meter.rounding)
|
|
return value
|
|
|
|
def get_used_space(self, name=None, at_date=None):
|
|
""" Used space in a location at date"""
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
Date_ = pool.get('ir.date')
|
|
|
|
if not self.check_space():
|
|
return 0
|
|
|
|
context = {'forecast': True}
|
|
if at_date:
|
|
context['stock_date_end'] = at_date
|
|
else:
|
|
context['stock_date_end'] = Transaction().context.get('stock_date_end')
|
|
if not Transaction().context.get('stock_date_end'):
|
|
context['stock_date_end'] = Date_.today()
|
|
|
|
product_ids = Product.get_products_occupy_space()
|
|
with Transaction().set_context(context):
|
|
pbl = Product.products_by_location(location_ids=self._get_space_availability_locations(),
|
|
product_ids=product_ids,
|
|
grouping=('product', ),
|
|
with_childs=(self.type == 'view'))
|
|
space = float(0)
|
|
for key, quantity in pbl.iteritems():
|
|
product = Product(key[1])
|
|
space += product.get_space(quantity)
|
|
|
|
return space
|
|
|
|
def get_available_space(self, name=None, at_date=None):
|
|
""" Available space in a location at date"""
|
|
return self.get_space() - self.get_used_space(name, at_date)
|
|
|
|
def _get_space_availability_locations(self):
|
|
return [self.id]
|
|
|
|
def check_space(self):
|
|
if self.control_space:
|
|
return True
|
|
if self.type in ['view', 'warehouse'] and self.childs:
|
|
return True
|
|
return False
|
|
|
|
|
|
class Move:
|
|
__name__ = 'stock.move'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Move, cls).__setup__()
|
|
cls._error_messages.update({'check_storage_space': 'Products cannot be stored in location "%s" at date "%s" '
|
|
'due to there is not enough space.'})
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
moves = super(Move, cls).create(vlist)
|
|
cls.check_storage_space(moves)
|
|
return moves
|
|
|
|
@classmethod
|
|
def write(cls, *args):
|
|
super(Move, cls).write(*args)
|
|
actions = iter(args)
|
|
for moves, values in zip(actions, actions):
|
|
cls.check_storage_space(moves)
|
|
|
|
@classmethod
|
|
def check_storage_space(cls, moves, raise_error=True):
|
|
""" Validates if storage space in locations is overloaded
|
|
|
|
moves is the list of movements. They can be existing movements or
|
|
new ones that will be persisted.
|
|
raise_error determines if error must be raised or not
|
|
|
|
Returns True if no overload.
|
|
"""
|
|
pool = Pool()
|
|
Configuration = pool.get('stock.configuration')
|
|
Location = pool.get('stock.location')
|
|
conf = Configuration(1)
|
|
Lang = pool.get('ir.lang')
|
|
language = Transaction().language
|
|
|
|
if not conf.space_control:
|
|
return True
|
|
|
|
to_review = cls._get_locations_to_check_space(moves)
|
|
|
|
if not to_review:
|
|
return True
|
|
|
|
# verify space per location and date
|
|
for key, extra_space in to_review.iteritems():
|
|
loc = key[0]
|
|
date = key[1]
|
|
location = Location(loc)
|
|
if not location.storage_try(date, extra_space):
|
|
if not raise_error:
|
|
return False
|
|
languages = Lang.search([('code', '=', language)])
|
|
if not languages:
|
|
languages = Lang.search([('code', '=', 'en_US')])
|
|
language, = languages
|
|
formatted = Lang.strftime(date, language.code, language.date)
|
|
if getattr(location, 'overload_behavior', 'warn') == 'warn':
|
|
cls.raise_user_warning('%s@%s.check_storage_space' % (loc, date),
|
|
'check_storage_space',
|
|
(location.rec_name, formatted))
|
|
elif getattr(location, 'overload_behavior', 'warn') == 'stop':
|
|
cls.raise_user_error('check_storage_space', (location.rec_name, formatted))
|
|
return True
|
|
|
|
@classmethod
|
|
def _get_locations_to_check_space(cls, moves, grouping=('location', 'date')):
|
|
""" Collects Locations that might be checked for storage space control.
|
|
|
|
Returns a dictionary with location id and grouping as key
|
|
and quantity as extra space needed for non persisted movements given.
|
|
"""
|
|
pool = Pool()
|
|
Product = pool.get('product.product')
|
|
|
|
if 'location' not in grouping:
|
|
raise ValueError('Grouping param has not field "location"')
|
|
if len(grouping) > 1 and 'date' not in grouping:
|
|
raise ValueError('Given grouping fields are not supported (%s)' % grouping)
|
|
|
|
locations = {}
|
|
for m in moves:
|
|
if not m.product.occupy_space:
|
|
continue
|
|
if not m.planned_date and not m.effective_date:
|
|
continue
|
|
# collects locations and dates to check
|
|
date = getattr(m, 'effective_date', None)
|
|
if not date:
|
|
date = m.planned_date
|
|
key_from = (m.from_location.id,)
|
|
key_to = (m.to_location.id,)
|
|
if 'date' in grouping:
|
|
key_from = key_from + (date,)
|
|
key_to = key_to + (date,)
|
|
if m.from_location.check_space():
|
|
locations.setdefault(key_from, float(0))
|
|
if m.to_location.check_space():
|
|
locations.setdefault(key_to, float(0))
|
|
# if moves are not persisted we must to include them in space computing
|
|
if not getattr(m, 'id', None):
|
|
product = Product(m.product.id)
|
|
qty = product.get_space(m.quantity, m.uom)
|
|
if m.from_location.check_space():
|
|
locations[key_from] = locations[key_from] - qty
|
|
if m.to_location.check_space():
|
|
locations[key_to] = locations[key_to] + qty
|
|
return locations
|
|
|
|
|
|
class LocationSpaceAvailability(ModelSQL, ModelView):
|
|
"""Location space availability"""
|
|
__name__ = 'stock.location_space_availability'
|
|
|
|
date = fields.Date('Date')
|
|
max_quantity = fields.Function(fields.Float('Maximum space'), 'get_max_quantity')
|
|
quantity = fields.Function(fields.Float('Used space'), 'get_quantity')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(LocationSpaceAvailability, cls).__setup__()
|
|
cls._order.insert(0, ('date', 'ASC'))
|
|
|
|
@staticmethod
|
|
def table_query():
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
Location = pool.get('stock.location')
|
|
Template = pool.get('product.template')
|
|
Product = pool.get('product.product')
|
|
move = Move.__table__()
|
|
template = Template.__table__()
|
|
product = Product.__table__()
|
|
|
|
location = Transaction().context.get('location', -1)
|
|
location_query = Location.search([
|
|
('parent', 'child_of', [location]),
|
|
], query=True, order=[])
|
|
date_column = Coalesce(move.effective_date, move.planned_date
|
|
).as_('date')
|
|
return move.join(product, condition=move.product == product.id
|
|
).join(template, condition=product.template == template.id
|
|
).select(
|
|
Max(move.id).as_('id'),
|
|
Literal(0).as_('create_uid'),
|
|
Now().as_('create_date'),
|
|
Literal(None).as_('write_uid'),
|
|
Literal(None).as_('write_date'),
|
|
date_column,
|
|
where=template.occupy_space
|
|
& (move.from_location.in_(location_query)
|
|
| move.to_location.in_(location_query))
|
|
& (Coalesce(move.effective_date, move.planned_date) != None),
|
|
group_by=(date_column, move.product))
|
|
|
|
@classmethod
|
|
def get_max_quantity(cls, lines, name):
|
|
pool = Pool()
|
|
_Location = pool.get('stock.location')
|
|
|
|
location_id = Transaction().context.get('location')
|
|
|
|
location = _Location(location_id)
|
|
_space = location.space
|
|
dates = sorted(l.date for l in lines)
|
|
quantities = {}
|
|
date_start = None
|
|
for date in dates:
|
|
quantities[date] = _space
|
|
try:
|
|
date_start = date + datetime.timedelta(1)
|
|
except OverflowError:
|
|
pass
|
|
return dict((l.id, quantities[l.date]) for l in lines)
|
|
|
|
@classmethod
|
|
def get_quantity(cls, lines, name):
|
|
_Location = Pool().get('stock.location')
|
|
|
|
location_id = Transaction().context.get('location')
|
|
location = _Location(location_id)
|
|
dates = sorted(l.date for l in lines)
|
|
quantities = {}
|
|
date_start = None
|
|
for date in dates:
|
|
context = {
|
|
'stock_date_start': date_start,
|
|
'stock_date_end': date,
|
|
'forecast': True,
|
|
}
|
|
with Transaction().set_context(**context):
|
|
quantities[date] = location.get_used_space()
|
|
try:
|
|
date_start = date + datetime.timedelta(1)
|
|
except OverflowError:
|
|
pass
|
|
cumulate = 0
|
|
for date in dates:
|
|
cumulate += quantities[date]
|
|
quantities[date] = cumulate
|
|
|
|
return dict((l.id, quantities[l.date]) for l in lines)
|
|
|
|
|
|
class OpenLocationSpaceAvailability(Wizard):
|
|
"""Open location space availability"""
|
|
__name__ = 'stock.location_space_availability'
|
|
start_state = 'open_'
|
|
|
|
open_ = StateAction('stock_storage_space.act_location_space_availability')
|
|
|
|
def do_open_(self, action):
|
|
Date = Pool().get('ir.date')
|
|
action['pyson_context'] = PYSONEncoder().encode({
|
|
'location': Transaction().context['active_id']
|
|
})
|
|
action['pyson_search_value'] = PYSONEncoder().encode([
|
|
('date', '>=', Date.today()),
|
|
])
|
|
return action, {} |