trytond-stock_storage_space/stock.py

449 lines
17 KiB
Python

# The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime
from sql import Literal
from sql.aggregate import Max
from sql.conditionals import Coalesce
from sql.functions import Now
from trytond.model import fields, ModelSQL, ModelView
from trytond.pool import PoolMeta, Pool
from trytond.pyson import Eval, Equal, Not, Bool, Id, And, PYSONEncoder
from trytond.transaction import Transaction
from trytond.wizard import Wizard, StateAction
__all__ = ['Configuration', 'Location', 'Move',
'LocationSpaceAvailability', 'OpenLocationSpaceAvailability']
__metaclass__ = PoolMeta
class Configuration:
__name__ = 'stock.configuration'
space_control = fields.Boolean('Space control enabled',
help='Enables storage space control in locations for products which occupies space')
@staticmethod
def default_space_control():
return True
class Location:
__name__ = 'stock.location'
control_space = fields.Boolean('Enable space control',
help='Enables storage space control',
states={'invisible': Not(Equal(Eval('type'), 'storage'))},
depends=['type'])
space_measure = fields.Selection([('length', 'Length'),
('surface', 'Surface')], 'Space measure',
states={'invisible': Not(Equal(Eval('type'), 'storage')),
'required': Bool(Eval('control_space'))},
depends=['type', 'control_space'])
overload_behavior = fields.Selection([('warn', 'Warn'),
('stop', 'Stop')],
'Overload behavior',
help='Determines system behavior when storage space is overloaded',
states={'required': Bool(Eval('control_space')),
'invisible': Not(Equal(Eval('type'), 'storage'))},
depends=['type', 'control_space'])
available_space = fields.Function(fields.Float('Available space at date'), 'get_available_space')
space = fields.Function(fields.Float('Space capacity',
digits=(16, Eval('space_unit_digits', 2)),
depends=['space_unit_digits']),
'get_space')
space_unit = fields.Function(
fields.Many2One('product.uom', 'Space UOM',
domain=[('category', 'in', [Id('product', 'uom_cat_surface'),
Id('product', 'uom_cat_length')])],
states={'invisible': Not(Equal(Eval('type'), 'storage')),
'required': Bool(Eval('control_space'))},
depends=['type', 'control_space']),
'on_change_with_space_unit')
space_unit_digits = fields.Function(fields.Integer('Space Digits'),
'on_change_with_space_unit_digits')
@classmethod
def __setup__(cls):
super(Location, cls).__setup__()
if cls.length.states.get('required'):
cls.length.states['required'] |= Bool(Eval('control_space'))
else:
cls.length.states['required'] = Bool(Eval('control_space'))
cls.length.depends.append('control_space')
if cls.width.states.get('required'):
cls.width.states['required'] |= And(Bool(Eval('control_space')), Equal(Eval('space_measure'), 'surface'))
else:
cls.width.states['required'] = And(Bool(Eval('control_space')), Equal(Eval('space_measure'), 'surface'))
cls.width.depends.extend(['control_space', 'space_measure'])
@staticmethod
def default_space_measure():
return 'length'
@staticmethod
def default_space_unit():
pool = Pool()
Modeldata = pool.get('ir.model.data')
res = Location.default_space_measure()
if res == 'length':
return Modeldata.get_id('product', 'uom_meter')
if res == 'surface':
return Modeldata.get_id('product', 'uom_square_meter')
@staticmethod
def default_control_space():
return False
@staticmethod
def default_overload_behavior():
return 'warn'
@fields.depends('height_uom')
def on_change_with_space_unit_digits(self, name=None):
return (self.space_unit.digits if self.space_unit
else self.default_space_unit_digits())
@staticmethod
def default_space_unit_digits():
return 2
@fields.depends('control_space')
def on_change_control_space(self):
pool = Pool()
Uom = pool.get('product.uom')
if not self.control_space:
return
self.space_measure = 'length'
self.overload_behavior = 'warn'
self.space_unit = Uom(self.on_change_with_space_unit())
@fields.depends('space_measure')
def on_change_with_space_unit(self, name=None, value=None):
pool = Pool()
Modeldata = pool.get('ir.model.data')
if not value:
value = self.space_measure
if value == 'length':
return Modeldata.get_id('product', 'uom_meter')
if value == 'surface':
return Modeldata.get_id('product', 'uom_square_meter')
return None
@classmethod
def write(cls, *args):
super(Location, cls).write(*args)
locations = sum(args[::2], [])
cls._set_space_measure(locations)
#todo: control children configuration
@classmethod
def _set_space_measure(cls, locations):
to_update = set()
for location in locations:
if location.parent and location.parent.space_measure != location.space_measure:
to_update.add(location.parent)
if to_update:
cls.write(list(to_update), {
'space_measure': location.space_measure})
to_update.clear()
def storage_try(self, date, extra_space=float(0)):
""" Determines if storing in location at date is possible"""
with Transaction().set_context(stock_date_end=date):
return (self.get_available_space() - extra_space) >= 0
def get_space(self, name=None):
""" Location space calculation"""
pool = Pool()
UOM = pool.get('product.uom')
ModelData = pool.get('ir.model.data')
meter = UOM(ModelData.get_id('product', 'uom_meter'))
square_meter = UOM(ModelData.get_id('product', 'uom_square_meter'))
if not self.control_space:
if self.type not in ['view', 'warehouse', 'storage']:
return 0
space = 0
for c in self.childs:
if self.type == 'warehouse' and c.id in [self.output_location.id, self.input_location.id]:
continue
space += c.get_space()
return space
value = UOM.compute_qty(self.length_uom, getattr(self, 'length', float(0)), meter)
if self.space_measure == 'surface':
other_value = UOM.compute_qty(self.width_uom, getattr(self, 'width', float(0)), meter)
value *= other_value if other_value else 1
value = UOM.round(value, square_meter.rounding)
return value
def get_used_space(self, name=None, at_date=None):
""" Used space in a location at date"""
pool = Pool()
Product = pool.get('product.product')
Date_ = pool.get('ir.date')
if not self.check_space():
return 0
context = {'forecast': True}
if at_date:
context['stock_date_end'] = at_date
else:
context['stock_date_end'] = Transaction().context.get('stock_date_end')
if not Transaction().context.get('stock_date_end'):
context['stock_date_end'] = Date_.today()
product_ids = Product.get_products_occupy_space()
with Transaction().set_context(context):
pbl = Product.products_by_location(location_ids=self._get_space_availability_locations(),
product_ids=product_ids,
grouping=('product', ),
with_childs=(self.type == 'view'))
space = float(0)
for key, quantity in pbl.iteritems():
product = Product(key[1])
space += product.get_space(quantity)
return space
def get_available_space(self, name=None, at_date=None):
""" Available space in a location at date"""
return self.get_space() - self.get_used_space(name, at_date)
def _get_space_availability_locations(self):
return [self.id]
def check_space(self):
if self.control_space:
return True
if self.type in ['view', 'warehouse'] and self.childs:
return True
return False
class Move:
__name__ = 'stock.move'
@classmethod
def __setup__(cls):
super(Move, cls).__setup__()
cls._error_messages.update({'check_storage_space': 'Products cannot be stored in location "%s" at date "%s" '
'due to there is not enough space.'})
@classmethod
def create(cls, vlist):
moves = super(Move, cls).create(vlist)
cls.check_storage_space(moves)
return moves
@classmethod
def write(cls, *args):
super(Move, cls).write(*args)
actions = iter(args)
for moves, values in zip(actions, actions):
cls.check_storage_space(moves)
@classmethod
def check_storage_space(cls, moves, raise_error=True):
""" Validates if storage space in locations is overloaded
moves is the list of movements. They can be existing movements or
new ones that will be persisted.
raise_error determines if error must be raised or not
Returns True if no overload.
"""
pool = Pool()
Configuration = pool.get('stock.configuration')
Location = pool.get('stock.location')
conf = Configuration(1)
Lang = pool.get('ir.lang')
language = Transaction().language
if not conf.space_control:
return True
to_review = cls._get_locations_to_check_space(moves)
if not to_review:
return True
# verify space per location and date
for key, extra_space in to_review.iteritems():
loc = key[0]
date = key[1]
location = Location(loc)
if not location.storage_try(date, extra_space):
if not raise_error:
return False
languages = Lang.search([('code', '=', language)])
if not languages:
languages = Lang.search([('code', '=', 'en_US')])
language, = languages
formatted = Lang.strftime(date, language.code, language.date)
if getattr(location, 'overload_behavior', 'warn') == 'warn':
cls.raise_user_warning('%s@%s.check_storage_space' % (loc, date),
'check_storage_space',
(location.rec_name, formatted))
elif getattr(location, 'overload_behavior', 'warn') == 'stop':
cls.raise_user_error('check_storage_space', (location.rec_name, formatted))
return True
@classmethod
def _get_locations_to_check_space(cls, moves, grouping=('location', 'date')):
""" Collects Locations that might be checked for storage space control.
Returns a dictionary with location id and grouping as key
and quantity as extra space needed for non persisted movements given.
"""
pool = Pool()
Product = pool.get('product.product')
if 'location' not in grouping:
raise ValueError('Grouping param has not field "location"')
if len(grouping) > 1 and 'date' not in grouping:
raise ValueError('Given grouping fields are not supported (%s)' % grouping)
locations = {}
for m in moves:
if not m.product.occupy_space:
continue
if not m.planned_date and not m.effective_date:
continue
# collects locations and dates to check
date = getattr(m, 'effective_date', None)
if not date:
date = m.planned_date
key_from = (m.from_location.id,)
key_to = (m.to_location.id,)
if 'date' in grouping:
key_from = key_from + (date,)
key_to = key_to + (date,)
if m.from_location.check_space():
locations.setdefault(key_from, float(0))
if m.to_location.check_space():
locations.setdefault(key_to, float(0))
# if moves are not persisted we must to include them in space computing
if not getattr(m, 'id', None):
product = Product(m.product.id)
qty = product.get_space(m.quantity, m.uom)
if m.from_location.check_space():
locations[key_from] = locations[key_from] - qty
if m.to_location.check_space():
locations[key_to] = locations[key_to] + qty
return locations
class LocationSpaceAvailability(ModelSQL, ModelView):
"""Location space availability"""
__name__ = 'stock.location_space_availability'
date = fields.Date('Date')
max_quantity = fields.Function(fields.Float('Maximum space'), 'get_max_quantity')
quantity = fields.Function(fields.Float('Used space'), 'get_quantity')
@classmethod
def __setup__(cls):
super(LocationSpaceAvailability, cls).__setup__()
cls._order.insert(0, ('date', 'ASC'))
@staticmethod
def table_query():
pool = Pool()
Move = pool.get('stock.move')
Location = pool.get('stock.location')
Template = pool.get('product.template')
Product = pool.get('product.product')
move = Move.__table__()
template = Template.__table__()
product = Product.__table__()
location = Transaction().context.get('location', -1)
location_query = Location.search([
('parent', 'child_of', [location]),
], query=True, order=[])
date_column = Coalesce(move.effective_date, move.planned_date
).as_('date')
return move.join(product, condition=move.product == product.id
).join(template, condition=product.template == template.id
).select(
Max(move.id).as_('id'),
Literal(0).as_('create_uid'),
Now().as_('create_date'),
Literal(None).as_('write_uid'),
Literal(None).as_('write_date'),
date_column,
where=template.occupy_space
& (move.from_location.in_(location_query)
| move.to_location.in_(location_query))
& (Coalesce(move.effective_date, move.planned_date) != None),
group_by=(date_column, move.product))
@classmethod
def get_max_quantity(cls, lines, name):
pool = Pool()
_Location = pool.get('stock.location')
location_id = Transaction().context.get('location')
location = _Location(location_id)
_space = location.space
dates = sorted(l.date for l in lines)
quantities = {}
date_start = None
for date in dates:
quantities[date] = _space
try:
date_start = date + datetime.timedelta(1)
except OverflowError:
pass
return dict((l.id, quantities[l.date]) for l in lines)
@classmethod
def get_quantity(cls, lines, name):
_Location = Pool().get('stock.location')
location_id = Transaction().context.get('location')
location = _Location(location_id)
dates = sorted(l.date for l in lines)
quantities = {}
date_start = None
for date in dates:
context = {
'stock_date_start': date_start,
'stock_date_end': date,
'forecast': True,
}
with Transaction().set_context(**context):
quantities[date] = location.get_used_space()
try:
date_start = date + datetime.timedelta(1)
except OverflowError:
pass
cumulate = 0
for date in dates:
cumulate += quantities[date]
quantities[date] = cumulate
return dict((l.id, quantities[l.date]) for l in lines)
class OpenLocationSpaceAvailability(Wizard):
"""Open location space availability"""
__name__ = 'stock.location_space_availability'
start_state = 'open_'
open_ = StateAction('stock_storage_space.act_location_space_availability')
def do_open_(self, action):
Date = Pool().get('ir.date')
action['pyson_context'] = PYSONEncoder().encode({
'location': Transaction().context['active_id']
})
action['pyson_search_value'] = PYSONEncoder().encode([
('date', '>=', Date.today()),
])
return action, {}