trytond-stock_unit_load/unit_load.py

2398 lines
90 KiB
Python

# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
import time
import datetime
import dateutil.tz
from functools import partial
from itertools import groupby
from sql import With
from sql import Literal, Null
from sql.aggregate import Max, Sum, Min
from sql.conditionals import Coalesce, Case
from sql.operators import Concat, Like
from sql.functions import Round
from trytond import backend
from trytond.exceptions import UserError
from trytond.modules.company import CompanyReport
from trytond.pool import Pool
from trytond.pyson import Not, Eval, Bool, Equal, Date, If, Id, PYSONDecoder
from trytond.model import fields, ModelView, ModelSQL, Model, Check
from trytond.transaction import Transaction
from trytond.modules.stock_move_time.stock import DATE_FORMAT
from trytond.wizard import Wizard, StateTransition, StateView, Button
from trytond.rpc import RPC
from trytond.cache import Cache
from trytond.i18n import gettext
__all__ = ['UnitLoad', 'UnitLoadMove', 'MoveUnitLoad',
'MoveUnitLoadStart', 'UnitLoadLabel', 'DropUnitLoadData',
'DropUnitLoad', 'DropUnitLoadFailed', 'DropUnitLoadFailedProduct',
'BatchDropUnitLoad', 'BatchDropUnitLoadData', 'BatchDropUnitLoadConfirm',
'DropUnitLoadUL', 'DropUnitLoadEndDate', 'CaseLabel']
MOVE_CHANGES = ['product', 'uom', 'production_type', 'production_location',
'warehouse', 'production_moves', 'moves', 'production_state', 'company',
'quantity', 'start_date', 'end_date', 'cases_quantity']
class UnitLoad(ModelSQL, ModelView):
"""Unit load"""
__name__ = 'stock.unit_load'
_rec_name = 'code'
code = fields.Char('Code', required=True, select=True,
states={'readonly': Eval('code_readonly', True)},
depends=['code_readonly'])
code_readonly = fields.Function(fields.Boolean('Code Readonly'),
'get_code_readonly')
code_length = fields.Integer('Code Length', select=True, readonly=True)
company = fields.Many2One('company.company', 'Company', required=True,
states={
'readonly':
(Eval('state') != 'draft')
| (Eval('production_state') == 'done')
},
depends=['state', 'production_state'], select=True)
product = fields.Many2One('product.product', 'Product',
select=True,
ondelete='RESTRICT',
states={
'readonly': (
(Eval('state') != 'draft')
| (Eval('production_state') == 'done')),
'required': (
(Eval('production_state') == 'done')
| Eval('state').in_(['assigned', 'done']))
},
depends=['state', 'production_state'],)
uom = fields.Function(fields.Many2One('product.uom', 'UOM'),
'on_change_with_uom')
uom_category = fields.Function(
fields.Many2One('product.uom.category', 'UOM Category'),
'on_change_with_uom_category')
uom_digits = fields.Function(fields.Integer('UOM Digits'),
'on_change_with_uom_digits')
quantity = fields.Function(
fields.Float('Quantity', digits=(16, Eval('uom_digits', 2)),
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['uom_digits', 'state', 'production_state']),
'get_quantity', setter='set_quantity', searcher='search_quantity')
forecast_quantity = fields.Function(
fields.Float('Forecast Quantity', digits=(16, Eval('uom_digits', 2)),
depends=['uom_digits']),
'get_quantity', searcher='search_quantity')
internal_quantity = fields.Float('Internal Quantity', readonly=True)
moves = fields.One2Many('stock.move', 'unit_load', 'Moves',
readonly=True,
order=[('effective_date', 'ASC'), ('time_', 'ASC'), ('id', 'ASC')],
domain=[('company', '=', Eval('company', -1))],
depends=['company'])
location = fields.Function(
fields.Many2One('stock.location', 'Location', depends=['moves']),
'get_location')
locations = fields.Function(
fields.Many2Many('stock.location', None, None, 'Locations'),
'get_locations')
last_date = fields.Function(
fields.DateTime('Last date',
states={'readonly': Not(Equal(Eval('state'), 'draft'))},
depends=['state']),
'get_last_date')
last_moves = fields.Function(
fields.One2Many('stock.move', None, 'Last moves',
domain=[('unit_load', '=', Eval('id'))],
depends=['id']),
'get_last_moves')
ul_moves = fields.One2Many('stock.unit_load.move', 'unit_load',
'UL moves', readonly=True,
order=[('start_date', 'ASC'), ('end_date', 'ASC'), ('id', 'ASC')],
states={'invisible': Eval('production_state') != 'done'},
context={'unit_load': Eval('id')},
depends=['production_state', 'id'])
state = fields.Selection([
('staging', 'Staging'),
('draft', 'Draft'),
('assigned', 'Assigned'),
('done', 'Done'),
('cancelled', 'Cancelled')], 'State', select=True, readonly=True)
production_type = fields.Function(
fields.Selection([('location', 'Location')], 'Production type',
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['state', 'production_state']),
'get_production_type', setter='set_production_type')
warehouse = fields.Many2One('stock.location', 'Warehouse', select=True,
domain=[('type', '=', 'warehouse')],
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['state', 'production_state'])
warehouse_production = fields.Function(
fields.Many2One('stock.location', 'Warehouse production',
domain=[('type', '=', 'production')]),
'on_change_with_warehouse_production')
production_location = fields.Many2One('stock.location',
'Production location', select=True,
domain=[('type', '=', 'production'),
If(Bool(Eval('warehouse')),
('parent', 'child_of', Eval('warehouse_production')),
())],
states={
'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done'),
'required': Eval('production_type') == 'location',
'invisible': Eval('production_type') != 'location'
},
depends=['production_state', 'production_type', 'warehouse',
'warehouse_production', 'state'])
production_state = fields.Selection([
('running', 'Running'),
('done', 'Done')], 'Production state', readonly=True, select=True,
required=True)
production_moves = fields.Function(
fields.One2Many('stock.move', 'unit_load', 'Production moves',
domain=[['OR',
[('product', '=', Eval('product')),
('from_location.type', '=', 'production'),
('to_location.type', 'in', ('storage', 'supplier'))],
[('product', '!=', Eval('product')),
['OR',
[('from_location.type', '=', 'production'),
('to_location.type', 'in', ('storage', 'supplier'))],
[('to_location.type', '=', 'production'),
('from_location.type', 'in', ('storage', 'supplier'))]]
]],
('company', '=', Eval('company', -1))],
states={
'readonly': (
(Eval('state') != 'draft')
| (Eval('production_state') == 'done')),
'invisible': Eval('production_state') == 'done'
},
depends=['production_state', 'company', 'product']),
'get_production_moves', setter='set_production_moves')
available = fields.Function(fields.Boolean('Available',
depends=['locations']),
'get_available', searcher='search_available')
start_date = fields.DateTime('Start date', required=True,
format=DATE_FORMAT, states={
'readonly': (Eval('production_state') == 'done') | (
Eval('state') != 'draft')},
depends=['production_state', 'state'])
end_date = fields.DateTime('End date', required=True, format=DATE_FORMAT,
domain=[('end_date', '>=', Eval('start_date'))],
states={'readonly': (Eval('production_state') == 'done') | (
Eval('state') != 'draft')},
depends=['production_state', 'state', 'start_date'])
drop_moves = fields.Function(
fields.One2Many('stock.move', None, 'Drop moves'),
'get_drop_moves')
return_moves = fields.Function(
fields.One2Many('stock.move', None, 'Return moves'),
'get_return_moves')
location_type = fields.Function(
fields.Char('Location type', states={'invisible': Bool(True)},
depends=['location']),
'get_location_type')
dropped = fields.Boolean('Dropped', readonly=True, select=True)
cases_quantity = fields.Float('Cases', required=True,
digits=(16, Eval('cases_digits', 2)),
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['cases_digits', 'production_state', 'state'])
quantity_per_case = fields.Function(
fields.Float('Quantity per Case', digits=(16, Eval('uom_digits', 0)),
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['uom_digits', 'production_state', 'state']),
'get_quantity_per_case', setter='set_quantity_per_case')
cases_digits = fields.Function(fields.Integer('Cases Digits'),
'on_change_with_cases_digits')
available_cases_quantity = fields.Function(
fields.Float('Available cases',
digits=(16, Eval('cases_digits', 2)),
domain=[('available_cases_quantity', '>=', 0)],
depends=['cases_digits']),
'get_available_cases_quantity',
searcher='search_available_cases_quantity')
currency_digits = fields.Function(fields.Integer('Currency Digits'),
'get_currency_digits')
pallet_product = fields.Function(fields.Many2One('product.product',
'Pallet Product'), 'get_pallet_product')
case_product = fields.Function(fields.Many2One('product.product',
'Case Product'), 'get_case_product')
_product_category_cache = Cache('stock.unit_load.product_category_cache',
context=False)
shipment = fields.Reference('Shipment', selection='get_shipments',
readonly=True, select=True)
production_time = fields.Function(fields.TimeDelta('Production time'),
'get_production_time')
@classmethod
def __setup__(cls):
super(UnitLoad, cls).__setup__()
cls._order = [
('start_date', 'DESC'),
('id', 'DESC'),
]
t = cls.__table__()
cls._sql_constraints += [
('check_qty_pos', Check(t, t.internal_quantity >= 0),
'stock_unit_load.msg_stock_unit_load_check_qty_pos'),
]
cls._buttons.update({
'move_try': {
'icon': 'tryton-forward',
'invisible': ((Eval('state') != 'done') | (~Eval('available'))
| Eval('dropped')),
'depends': ['state', 'available', 'dropped']
},
'assign': {
'icon': 'tryton-forward',
'invisible': (Eval('state') != 'draft'),
'readonly': (Eval('production_state') == 'running') & (
~Eval('production_moves', [])),
'depends': ['state', 'production_state', 'production_moves']
},
'do': {
'icon': 'tryton-ok',
'invisible': Eval('state') != 'assigned',
'depends': ['state']
},
'cancel': {
'icon': 'tryton-cancel',
'invisible': Eval('state').in_(['cancelled', 'done']),
'depends': ['state']
},
'draft': {
'icon': 'tryton-undo',
'invisible': ~Eval('state').in_(['assigned']),
'depends': ['state']
},
'drop_wizard': {
'icon': 'tryton-forward',
'invisible': (Eval('production_state') != 'done') | (
~Eval('available')) | Eval('dropped'),
'depends': ['state', 'available', 'dropped']
}
})
cls.__rpc__.update({
'auto_drop': RPC(readonly=False),
})
cls._deny_modify_not_available = {'product', 'internal_quantity'}
cls._deny_modify_done = {'internal_quantity'}
@classmethod
def __register__(cls, module_name):
cursor = Transaction().connection.cursor()
table = cls.__table__()
Move = Pool().get('stock.move')
move_table = Move.__table__()
tableh = cls.__table_handler__(module_name)
pstate_column_exists = tableh.column_exist('production_state')
state_column_exists = tableh.column_exist('state')
shipment_column_exists = tableh.column_exist('shipment')
super().__register__(module_name)
# Migration from 5.0.0: persist column
if not pstate_column_exists and \
Move.__table_handler__().column_exist('unit_load'):
# if exists done moves, production state is done
cursor.execute(*table.join(move_table, condition=(
table.id == move_table.unit_load)
).select(table.id,
where=(move_table.state == 'done'),
group_by=(table.id)
)
)
ids = [r[0] for r in cursor.fetchall()]
if ids:
cursor.execute(*table.update(
[table.production_state], ['done'],
where=(table.id.in_(ids))
))
if not shipment_column_exists:
cursor.execute(*table.join(move_table, condition=(
table.id == move_table.unit_load)
).select(
table.id, move_table.shipment,
where=(
Like(move_table.shipment, 'stock.shipment.out,%') |
Like(move_table.shipment,
'stock.shipment.out.return,%') |
Like(move_table.shipment,
'stock.shipment.internal,%')
),
group_by=(table.id, move_table.shipment,
move_table.end_date),
order_by=(table.id, move_table.end_date.asc)))
for ul_id, shipment in cursor.fetchall():
cursor.execute(*table.update(
columns=[table.shipment],
values=[shipment],
where=table.id == ul_id))
if not state_column_exists:
priority = cls._states_priority()
conditions = [
(move_table.state == k, v) for k, v in priority.items()]
cursor.execute(*table.join(move_table, 'LEFT', condition=(
(move_table.unit_load == table.id) &
(move_table.product == table.product))
).select(
table.id.as_('unit_load'),
Coalesce(
Min(Case(*conditions)),
priority[Move.default_state()]).as_('state'),
group_by=table.id)
)
for ul_id, state in cursor.fetchall():
cursor.execute(*table.update(
columns=[table.state],
values=[Case(*[
(state == v, k) for k, v in priority.items()])],
where=table.id == ul_id))
# Migration from 5.6: rename state cancel to cancelled
cursor.execute(*table.update(
[table.state], ['cancelled'],
where=table.state == 'cancel'))
tableh.not_null_action('product', action='remove')
@staticmethod
def default_code_readonly():
model_config = Pool().get('stock.configuration')
config = model_config(1)
return bool(config.unit_load_sequence)
def get_code_readonly(self, name):
return True
@staticmethod
def default_company():
return Transaction().context.get('company')
@classmethod
def default_state(cls):
return 'draft'
@staticmethod
def default_start_date():
return datetime.datetime.now()
@staticmethod
def default_end_date():
return datetime.datetime.now()
@staticmethod
def default_dropped():
return False
@classmethod
def create(cls, vlist):
pool = Pool()
Config = pool.get('stock.configuration')
vlist = [x.copy() for x in vlist]
config = Config(1)
default_company = cls.default_company()
for values in vlist:
if not values.get('code'):
values['code'] = config.get_multivalue(
'unit_load_sequence',
company=values.get('company', default_company)).get()
values['code_length'] = len(values['code'])
return super(UnitLoad, cls).create(vlist)
@classmethod
def write(cls, *args):
actions = iter(args)
args = []
for records, values in zip(actions, actions):
if values.get('code'):
values = values.copy()
values['code_length'] = len(values['code'])
_deny_done = cls._deny_modify_done
_deny_modify = cls._deny_modify_not_available
vals_set = set(values)
if vals_set & _deny_done:
cls._check_deny_modify_done(records)
if vals_set & _deny_modify:
cls._check_deny_modify_not_available(records)
args.extend((records, values))
super(UnitLoad, cls).write(*args)
@classmethod
def _check_deny_modify_done(cls, records):
for record in records:
if record.production_state == 'done':
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_ul_done',
unit_load=record.rec_name))
@classmethod
def _check_deny_modify_not_available(cls, records):
for record in records:
if record.production_state == 'done' and not record.available:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_ul_not_available',
unit_load=record.rec_name))
def get_rec_name(self, name):
if Transaction().context.get('ul_extended_rec_name', False):
cases_string = self.fields_get(fields_names=['cases_quantity'])[
'cases_quantity']['string']
return '%s - %s (%s %s, %s %s)' % (
self.code,
self.product and self.product.rec_name or '',
self.cases_quantity,
cases_string,
self.quantity,
self.uom and self.uom.symbol or '')
return super().get_rec_name(name)
@staticmethod
def _get_internal_quantity(quantity, uom, product):
Uom = Pool().get('product.uom')
internal_quantity = Uom.compute_qty(uom, quantity,
product.default_uom, round=True)
return internal_quantity
@classmethod
def copy(cls, uls, default=None):
pool = Pool()
Move = pool.get('stock.move')
if default is None:
default = {}
default = default.copy()
default['code'] = None
default['production_state'] = 'running'
default['moves'] = None
default['ul_moves'] = None
default['dropped'] = False
new_uls = []
for ul in uls:
new_ul, = super(UnitLoad, cls).copy([ul], default=default)
Move.copy(ul.production_moves, default={
'unit_load': new_ul.id})
new_uls.append(new_ul)
return new_uls
@fields.depends('product')
def on_change_with_uom(self, name=None):
if self.product:
return self.product.default_uom.id
return None
@fields.depends('product')
def on_change_with_uom_digits(self, name=None):
if self.product:
return self.product.default_uom.digits
return 2
@fields.depends('uom')
def on_change_with_uom_category(self, name=None):
if self.uom:
return self.uom.category.id
return None
@fields.depends('start_date', 'end_date')
def on_change_start_date(self):
if (self.start_date and
(not self.end_date or self.start_date > self.end_date)):
self.end_date = self.start_date
@classmethod
def get_quantity(cls, records, name):
location_ids = Transaction().context.get('locations')
if location_ids:
# get quantity from stock query based on location
with Transaction().set_context(cls._quantity_context(name)):
return cls._get_quantity(records, name, location_ids)
# get quantity from production moves
res = {}
for record in records:
res[record.id] = record.internal_quantity
return res
@classmethod
def _quantity_context(cls, name):
new_context = {}
if name == 'quantity':
if (new_context.get('stock_date_end') and
new_context['stock_date_end'] > Date.today()):
new_context['stock_date_end'] = Date.today()
elif name == 'forecast_quantity':
new_context['forecast'] = True
if not new_context.get('stock_date_end'):
new_context['stock_date_end'] = datetime.date.max
return new_context
@classmethod
def _get_quantity(cls, records, name, location_ids):
location_ids, wh_to_add, storage_to_remove = \
cls._skip_warehouse_in_compute_quantities(location_ids)
uls = dict.fromkeys(list(map(int, records)), 0)
ul_products = {r.id: r.product.id for r in records}
with_childs = Transaction().context.get(
'with_childs', len(location_ids) == 1)
quantities = cls._compute_quantities(records, location_ids,
with_childs=with_childs)
if wh_to_add:
for wh, storage in wh_to_add.items():
for key in quantities:
if key[0] == storage:
quantities[(wh,) + key[1:]] = quantities[key]
if storage in storage_to_remove:
del quantities[key]
for key, quantity in quantities.items():
if (key[-1] is not None and
key[-1] in uls and
key[1] == ul_products[key[-1]]):
uls[key[-1]] += quantity
return uls
@classmethod
def _compute_quantities(cls, records, location_ids, with_childs=False,
product_id=None):
pool = Pool()
Move = pool.get('stock.move')
product_ids = [r.product.id
for r in records if r.product and not product_id]
if not product_ids and product_id:
product_ids.append(product_id)
grouping = ('product', 'unit_load',)
grouping_filter = (product_ids, tuple(r.id for r in records))
query = Move.compute_quantities_query(location_ids,
with_childs=with_childs,
grouping=grouping,
grouping_filter=grouping_filter)
if query is None:
return {}
# TODO: we should filter by main product of UL (with a join or subselect)
quantities = Move.compute_quantities(query, location_ids,
with_childs=with_childs,
grouping=grouping,
grouping_filter=grouping_filter)
return quantities
@classmethod
def _skip_warehouse_in_compute_quantities(cls, location_ids):
pool = Pool()
Location = pool.get('stock.location')
storage_to_remove = set()
wh_to_add = {}
if Transaction().context.get('stock_skip_warehouse'):
location_ids = set(location_ids)
for location in Location.browse(list(location_ids)):
if location.type == 'warehouse':
location_ids.remove(location.id)
if location.storage_location.id not in location_ids:
storage_to_remove.add(location.storage_location.id)
location_ids.add(location.storage_location.id)
wh_to_add[location.id] = location.storage_location.id
location_ids = list(location_ids)
return location_ids, wh_to_add, storage_to_remove
@classmethod
def set_quantity(cls, records, name, value):
if value:
cls.write(records, {'internal_quantity': value})
@classmethod
def search_quantity(cls, name, domain=None):
location_ids = Transaction().context.get('locations')
return cls._search_quantity(name, location_ids, domain)
@classmethod
def _search_quantity(cls, name, location_ids, domain=None):
pool = Pool()
Move = pool.get('stock.move')
ul = cls.__table__()
if not location_ids:
return []
location_ids, _, _ = cls._skip_warehouse_in_compute_quantities(
location_ids)
with_childs = Transaction().context.get(
'with_childs', len(location_ids) == 1)
with Transaction().set_context(cls._quantity_context(name)):
grouping = ('product', 'unit_load',)
query = Move.compute_quantities_query(location_ids,
with_childs=with_childs,
grouping=grouping,
grouping_filter=None)
having_domain = getattr(cls, name)._field.convert_domain(domain, {
None: (query, {}),
}, cls)
query = query.join(ul, condition=(
(query.unit_load == ul.id) & (query.product == ul.product))
).select(query.location,
query.product,
query.unit_load,
query.quantity,
where=having_domain)
quantities = Move.compute_quantities(query, location_ids,
with_childs=True,
grouping=grouping,
grouping_filter=None)
record_ids = []
for key, quantity in quantities.items():
# pbl could return None in some keys
if key[-1] is not None:
record_ids.append(key[-1])
return [('id', 'in', record_ids)]
@staticmethod
def default_available():
return True
@classmethod
def get_available(cls, records, name=None):
res = {}
for record in records:
res[record.id] = not record.dropped
if not res[record.id]:
continue
if record.production_state != 'done':
res[record.id] = False
elif record.state == 'cancelled':
res[record.id] = False
elif record.shipment and record.shipment.__name__ in {
'stock.shipment.out',
'stock.shipment.in.return'}:
res[record.id] = False
return res
@classmethod
def search_available(cls, name, clause):
reverse = {
'=': ['!=', 'not like' if clause[2] else 'like'],
'!=': ['=', 'like' if clause[2] else 'not like']}
result = []
if clause[1] in reverse:
operation = 'AND'
if (clause[2] and clause[1] == '!=') or (
not clause[2] and clause[1] == '='):
operation = 'OR'
result.extend([operation,
('production_state', reverse[clause[1]][0]
if operation == 'AND' else clause[1], 'running'),
('state', reverse[clause[1]][0]
if operation == 'AND' else clause[1], 'cancelled'),
('dropped', reverse[clause[1]][0], clause[2]),
['OR' if operation == 'AND' else 'AND',
('shipment', clause[1] if operation == 'AND'
else reverse[clause[1]][0], None),
[operation,
('shipment', reverse[clause[1]][1],
'stock.shipment.out,%'),
('shipment', reverse[clause[1]][1],
'stock.shipment.in.return,%')
]
]
])
return result
@classmethod
def get_available_locations_domain(cls):
return [('type', '=', 'storage')]
@classmethod
def get_location(cls, records, name=None, product_id=None, type=None,
at_date=None):
pool = Pool()
Move = pool.get('stock.move')
value = dict.fromkeys(list(map(int, records)), None)
for record in records:
moves = record.get_last_moves(product_id=product_id,
location_type=type, at_date=at_date)
if moves:
value[record.id] = Move(moves[0]).to_location.id
return value
def get_location_type(self, name=None):
if self.location:
return self.location.type
return None
@classmethod
def _get_locations(cls, records, product_id=None, type=None):
record_ids = [r.id for r in records]
ul_products = {r.id: r.product.id
if r.product and not product_id else product_id
for r in records}
location_ids = list(set(
[m.to_location.id for r in records for m in r.moves
if (not type or m.to_location.type == type)]))
locations = {r: [] for r in record_ids}
quantities = cls._compute_quantities(records, location_ids,
product_id=product_id)
for key, quantity in quantities.items():
if quantity <= 0:
continue
if key[1] == ul_products[key[-1]]:
locations[key[-1]].append(key[0])
return locations
@classmethod
def get_locations(cls, records, name=None, product_id=None, type=None):
context = Transaction().context
if not context.get('stock_date_max'):
context['stock_date_end'] = datetime.date.max
if 'forecast' not in context:
context['forecast'] = True
context['active_test'] = False
with Transaction().set_context(context):
return cls._get_locations(records, product_id, type)
def get_last_date(self, name=None):
if not self.moves:
return None
_moves = [m for m in self.moves if m.state != 'cancelled' and
m.product.id == self.product.id]
if not _moves:
return None
return max(m.end_date for m in _moves)
def get_last_moves(self, name=None, product_id=None, location_type=None,
at_date=None, check_start_date=False, **kwargs):
if not self.moves:
return []
def checked_date(move):
return check_start_date and move.start_date or move.end_date
if not product_id:
product_id = self.product.id
max_date, location_id = None, None
tup_rev = check_start_date and -1 or 1
for move in sorted([m for m in self.moves if m.product.id == product_id
], key=lambda x: (
x.end_date or x.start_date or datetime.datetime.min,
x.start_date or datetime.datetime.min)[::tup_rev],
reverse=True):
if move.state == 'cancelled':
continue
if location_type and location_type != move.to_location.type:
continue
max_date = checked_date(move)
location_id = move.to_location.id
if at_date and at_date < checked_date(move):
continue
break
return [m.id for m in self.moves if
(check_start_date and m.start_date or m.end_date) == max_date and
m.to_location.id == location_id]
def get_production_type(self, name=None):
if self.production_location:
return 'location'
return None
@classmethod
def default_production_type(cls):
pool = Pool()
Configuration = pool.get('stock.configuration')
return Configuration(1).ul_production_type or 'location'
@classmethod
def set_production_type(cls, records, name, value):
pass
@fields.depends('warehouse')
def on_change_with_warehouse_production(self, name=None):
if self.warehouse:
return self.warehouse.production_location.id
return None
@classmethod
def default_production_state(cls):
return 'running'
def get_production_moves(self, name=None):
if not self.moves:
return []
values = []
if self.production_type == 'location':
values = [m for m in self.moves
if (m.from_location.type == 'production' and
m.from_location.id == self.production_location.id and
m.to_location.type in {'supplier', 'storage'}) or
(m.to_location.id == self.production_location.id and
m.product.id != self.product.id and
m.from_location.type in {'supplier', 'storage'})] or []
if any(m.state == 'done' for m in values):
return [m.id for m in values
if m.start_date == self.start_date] or []
return [m.id for m in values] or []
@classmethod
def set_production_moves(cls, records, name, value):
if not value:
return
new_value, to_delete = [], []
for action in value:
# never unlink, always delete
if action[0] in ('delete', 'remove'):
to_delete.extend(action[1])
else:
new_value.append(action)
new_value.append(('delete', to_delete))
cls.write(records, {'moves': new_value})
@classmethod
def get_currency_digits(cls, records, name):
return {r.id: r.company.currency and r.company.currency.digits
for r in records}
@classmethod
def move(cls, records, to_location, at_date):
pool = Pool()
Move = pool.get('stock.move')
Location = pool.get('stock.location')
to_create = []
for record in records:
from_location = Location(record.get_location([record],
type='storage')[record.id])
new_moves = record._move(to_location, at_date, from_location)
to_create.extend(new_moves)
if to_create:
Move.save(to_create)
return Move.browse(list(map(int, to_create)))
def _move(self, to_location, at_date, from_location):
self.check_to_move(from_location, to_location, at_date)
new_moves = self._get_new_moves({
'from_location': from_location.id,
'to_location': to_location.id,
'start_date': at_date,
'end_date': at_date
},
location_type=from_location.type)
return new_moves
def check_to_move(self, from_location, to_location, at_date):
pool = Pool()
Move = pool.get('stock.move')
if not from_location:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_missing_location',
unit_load=self.rec_name))
if self.state != 'done':
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_wrong_state',
unit_load=self.rec_name))
if to_location.id == from_location.id:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_wrong_move_location',
unit_load=self.rec_name,
location=to_location.rec_name))
_max_date = max(m.end_date for m in self.last_moves)
if from_location.type == 'storage' and \
to_location.type == 'production':
# allow overlapped drops
if self.drop_moves:
_last_moves = self.get_last_moves(location_type='storage')
if _last_moves:
_last_moves = Move.browse(_last_moves)
_max_date = max(m.end_date for m in _last_moves)
if _max_date > at_date:
lzone = (dateutil.tz.gettz(self.company.timezone
) if self.company.timezone else dateutil.tz.tzutc())
szone = dateutil.tz.tzutc()
ftdate = at_date.replace(tzinfo=szone).astimezone(
lzone).replace(tzinfo=None)
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_wrong_move_date',
unit_load=self.rec_name,
date=ftdate))
def _get_new_moves(self, default_values={}, location_type=None,
cases_quantity=None, **kwargs):
pool = Pool()
Move = pool.get('stock.move')
default_values.update({
'unit_load': self.id,
'origin': None,
'shipment': None,
'planned_date': default_values.get('start_date').date() or None
})
moves = []
_last_moves = Move.browse(self.get_last_moves(
location_type=location_type, **kwargs))
if not default_values.get('from_location'):
if not location_type:
default_values['from_location'] = self.location.id
elif _last_moves:
default_values['from_location'] = _last_moves[0].to_location.id
keyfunc = partial(self._get_group_move_key, _last_moves)
data = sorted(_last_moves, key=keyfunc)
for key, grouped_moves in groupby(data, key=keyfunc):
_grouped_moves = list(grouped_moves)
move = self._get_new_move(_grouped_moves[0], default_values)
move.quantity = move.uom.round(self._get_quantity_to_move(
_grouped_moves, move.product, move.uom, cases_quantity,
**kwargs))
if move.quantity:
moves.append(move)
return moves
def _get_quantity_to_move(self, _grouped_moves, product, uom,
cases_quantity, **kwargs):
pool = Pool()
Uom = pool.get('product.uom')
if cases_quantity == 0:
return 0
qty = sum(Uom.compute_qty(m.uom, m.quantity, uom)
for m in _grouped_moves)
cases_to_move = cases_quantity or self.available_cases_quantity
if cases_to_move != self.cases_quantity:
factor = min(1.0, cases_to_move / self.cases_quantity)
factorized_qty = uom.round(qty * factor)
if factorized_qty:
qty = factorized_qty
return qty
def _get_new_move(self, move, default_values):
pool = Pool()
Move = pool.get('stock.move')
new_move = Move(**default_values)
new_move.product = move.product
new_move.uom = move.uom
new_move.quantity = move.quantity
new_move.company = move.company
new_move.currency = move.currency
new_move.unit_price = move.unit_price
return new_move
def _get_group_move_key(self, moves, move):
res = (move.to_location.id, move.product.id)
try:
Lot = Pool().get('stock.lot')
if move.lot:
res = res + (move.lot.id, )
except KeyError:
pass
return res
@classmethod
def _states_priority(cls):
return {
'draft': 0,
'assigned': 1,
'done': 2,
'cancelled': 3
}
def _get_state(self):
pool = Pool()
Move = pool.get('stock.move')
ULMove = pool.get('stock.unit_load.move')
priority = self._states_priority()
# do not use .ul_moves to reload records
ul_moves = ULMove.search([
('unit_load', '=', self.id)])
if not ul_moves:
return Move.default_state()
_states = sorted(set(m.state for m in ul_moves),
key=lambda x: priority[x])
return _states[0]
@classmethod
def set_state(cls, records):
to_write = []
records = cls.browse(records)
for record in records:
state = record._get_state()
if record.state != state:
to_write.append(record)
if to_write:
cls.write(to_write, {
'state': state
})
@fields.depends('cases_quantity')
def on_change_with_cases_digits(self, name=None):
return self.default_cases_digits()
@classmethod
def default_cases_digits(cls):
pool = Pool()
Modeldata = pool.get('ir.model.data')
Uom = pool.get('product.uom')
return Uom(Modeldata.get_id('product', 'uom_unit')).digits
@fields.depends('quantity_per_case', 'cases_quantity',
methods=['on_change_quantity'])
def on_change_quantity_per_case(self):
if self.quantity_per_case and self.cases_quantity:
self.quantity = self.quantity_per_case * self.cases_quantity
self.on_change_quantity()
@fields.depends('quantity', 'cases_quantity', 'uom')
def get_quantity_per_case(self, name=None):
if self.quantity and self.cases_quantity and self.uom:
return self.uom.round(self.quantity / self.cases_quantity)
return None
@classmethod
def set_quantity_per_case(cls, records, name, value):
pass
@classmethod
def get_available_cases_quantity(cls, records, name):
res = {}
for record in records:
res[record.id] = record.cases_quantity
if not record.dropped:
dropped_qty = record._get_dropped_quantity()
if dropped_qty:
dropped_cases = round(
(dropped_qty / record.internal_quantity
) * record.cases_quantity, record.cases_digits)
res[record.id] -= dropped_cases
else:
res[record.id] = 0.0
return res
@classmethod
def search_available_cases_quantity(cls, name, clause):
unit_load = cls.__table__()
_, operator, value = clause
Operator = fields.SQL_OPERATORS[operator]
dropped_query = With()
dropped_query.query = cls._get_available_cases_dropped_query()
query = unit_load.join(dropped_query, 'LEFT', condition=(
dropped_query.unit_load == unit_load.id)
).select(
unit_load.id,
group_by=(unit_load.id, unit_load.cases_quantity,
Coalesce(dropped_query.cases_quantity, 0)),
having=Operator(unit_load.cases_quantity - Coalesce(
dropped_query.cases_quantity, 0), value),
with_=[dropped_query])
return [('id', 'in', query)]
@classmethod
def _get_available_cases_dropped_query(cls):
pool = Pool()
Move = pool.get('stock.move')
Location = pool.get('stock.location')
User = pool.get('res.user')
unit_load2 = cls.__table__()
move = Move.__table__()
to_location = Location.__table__()
from_location = Location.__table__()
user = User(Transaction().user)
if not user.company:
return []
company_id = user.company.id
cast_ = cls.available_cases_quantity._field.sql_cast
cases_quantity = cast_(Round(unit_load2.cases_quantity * Coalesce(
Sum(move.internal_quantity), 0) / unit_load2.internal_quantity)
)
query = unit_load2.join(move, condition=(
(move.unit_load == unit_load2.id) &
(move.product == unit_load2.product))
).join(from_location, condition=(
move.from_location == from_location.id)
).join(to_location, condition=(
move.to_location == to_location.id)
).select(
unit_load2.id.as_('unit_load'),
cases_quantity.as_('cases_quantity'),
where=(
(from_location.type.in_(['production', 'lost_found'])) &
(to_location.type.in_(['production', 'lost_found'])) &
(unit_load2.company == company_id)
),
group_by=unit_load2.id,
)
return query
def get_case_product(self, name=None):
return self._get_product_by_category('case_category')
def get_pallet_product(self, name=None):
return self._get_product_by_category('pallet_category')
def _get_product_by_category(self, category_name):
pool = Pool()
Category = pool.get('product.category')
Configuration = pool.get('stock.configuration')
if not self.production_moves:
return None
cats = self._product_category_cache.get(category_name, [])
if not cats:
config = Configuration(1)
category = getattr(config, category_name, None)
if not category:
return None
cats = [c.id for c in Category.search([
('parent', 'child_of', category.id)])]
self._product_category_cache.set(category_name, cats)
for move in self.production_moves:
if set([c.id for c in move.product.categories]) & set(cats):
return move.product
@fields.depends('cases_quantity', 'quantity_per_case',
methods=['explode_production_moves'])
def on_change_cases_quantity(self):
if not self.cases_quantity:
return
if self.quantity_per_case:
self.quantity = self.cases_quantity * self.quantity_per_case
self.explode_production_moves()
@classmethod
def delete(cls, records):
pool = Pool()
Move = pool.get('stock.move')
for record in records:
if record.production_state == 'done':
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_delete_done',
unit_load=record.rec_name))
Move.cancel([m for r in records for m in r.moves])
Move.delete([m for r in records for m in r.moves])
super(UnitLoad, cls).delete(records)
@classmethod
@ModelView.button_action('stock_unit_load.wizard_move_unit_load')
def move_try(cls, records):
pass
@classmethod
@ModelView.button
def draft(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = [m for r in records for m in r.moves
if m.state in ['cancelled', 'assigned']]
for move in moves:
if not cls.check_move_origin(move):
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_state_origin',
unit_load=move.unit_load.rec_name,
origin=move.origin.rec_name))
if moves:
Move.draft(moves)
@classmethod
def get_moves_to_cancel(cls, records):
return [m for r in records for m in r.moves if m.state in
['draft', 'assigned']]
@classmethod
@ModelView.button
def cancel(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = cls.get_moves_to_cancel(records)
for move in moves:
if not cls.check_move_origin(move):
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_state_origin',
unit_load=move.unit_load.rec_name,
origin=move.origin.rec_name))
if moves:
Move.cancel(moves)
Move.delete(moves)
@classmethod
def _assing_ul_moves(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = [m for r in records for m in r.moves if m.state == 'draft']
if moves:
Move.assign(moves)
@classmethod
@ModelView.button
def assign(cls, records):
origin_moves = [m for r in records for m in r.moves
if m.state in ('draft', 'assigned')]
for move in origin_moves:
if not cls.check_move_origin(move):
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_state_origin',
unit_load=move.unit_load.rec_name,
origin=move.origin.rec_name))
cls.check_dates(records)
cls._assing_ul_moves(records)
@classmethod
@ModelView.button
def do(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = [m for r in records for m in r.moves
if m.state in ('draft', 'assigned')]
cls.check_dates(records)
for move in moves:
if not cls.check_move_origin(move):
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_state_origin',
unit_load=move.unit_load.rec_name,
origin=move.origin.rec_name))
if moves:
Move.do(moves)
cls.set_drop_state(records)
move_changes = []
for record in records:
if record.return_moves:
move_changes.extend([
[m for m in record.return_moves if not m.origin], {
'unit_load': None,
'origin': 'stock.unit_load,%s' % record.id
},
[m for m in record.return_moves if m.origin], {
'unit_load': None
}])
if move_changes:
Move.write(*move_changes)
# set done production state
cls.set_production_state(records)
cls.set_state(records)
@classmethod
def set_production_state(cls, records, state='done'):
to_write = []
for record in records:
if record.production_state != state:
to_write.append(record)
if to_write:
cls.write(to_write, {
'production_state': state
})
@classmethod
def set_drop_state(cls, records):
to_drop = []
to_undrop = []
changes = {
True: to_drop,
False: to_undrop
}
for record in records:
if record.dropped and not record.drop_moves:
to_undrop.append(record)
continue
qty = record._get_dropped_quantity(done=True)
if (qty - record.quantity) > 1.0:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_wrong_dropped_qty',
dropped_qty=qty,
quantity=record.quantity,
unit_load=record.rec_name))
if abs(record.internal_quantity - qty) < 1.0 and \
not record.dropped:
to_drop.append(record)
elif qty < record.internal_quantity and record.dropped:
to_undrop.append(record)
for key, values in changes.items():
if values:
cls.write(values, {'dropped': key})
def _get_dropped_quantity(self, product=None, to_uom=None, done=False):
if not self.drop_moves:
return 0.0
_product = product or self.product
return self.uom.round(
sum(self.uom.compute_qty(m.uom, m.quantity, to_uom or self.uom)
for m in self.drop_moves if m.product.id == _product.id and
(m.state == 'done' if done else True)))
@classmethod
def check_move_origin(cls, move):
return (not move.origin or move.origin.__name__ == 'stock.unit_load'
) and not move.shipment
@classmethod
def check_dates(cls, records):
pool = Pool()
Move = pool.get('stock.move')
to_write = []
for record in records:
if record.production_state != 'running':
continue
inputs = record._get_input_moves_to_check_date()
outputs = [m for m in record.production_moves
if m.from_location.type == 'production']
to_write.extend([inputs, {
'start_date': record.start_date,
'end_date': record.start_date
},
outputs, {
'start_date': record.start_date,
'end_date': record.end_date
}])
if to_write:
Move.write(*to_write)
def _get_input_moves_to_check_date(self):
return [m for m in self.production_moves
if m.to_location.type == 'production']
@fields.depends('product', methods=['explode_production_moves'])
def on_change_product(self):
if self.product:
self.uom = self.product.default_uom
self.uom_category = self.uom.category
self.explode_production_moves()
@fields.depends(methods=['get_quantity_per_case',
'explode_production_moves'])
def on_change_quantity(self):
self.quantity_per_case = self.get_quantity_per_case()
self.explode_production_moves()
@fields.depends(methods=['explode_production_moves'])
def on_change_production_location(self):
self.explode_production_moves()
@ModelView.button_change(*(MOVE_CHANGES + ['state', 'production_state']))
@fields.depends(methods=['explode_production_moves'])
def reset_production_moves(self):
self.explode_production_moves()
@fields.depends('production_state', 'state',
methods=['_explode_production_moves'])
def explode_production_moves(self):
if (self.production_state == 'done' or
(self.state or 'draft') != 'draft'):
return
self._explode_production_moves()
@fields.depends('production_moves',
methods=['_get_production_move', '_check_production_data'])
def _explode_production_moves(self):
if not self._check_production_data():
self.production_moves = []
return
_origin = None
if self.production_moves:
for move in self.production_moves:
if move.origin:
_origin = move.origin
break
move = self._get_production_move()
move.origin = _origin
self.production_moves = [move]
@fields.depends('company', 'production_location', 'warehouse', 'product',
'quantity', 'start_date', 'end_date',
methods=['on_change_product'])
def _get_production_move(self):
pool = Pool()
Move = pool.get('stock.move')
move = Move(company=self.company,
from_location=self.production_location,
to_location=self.warehouse.storage_location
if self.warehouse else None,
product=self.product,
unit_price=self.product and self.product.cost_price or 0,
quantity=self.quantity,
planned_date=self.start_date.date(),
effective_date=self.start_date.date(),
start_date=self.start_date,
end_date=self.end_date,
currency=self.company.currency if self.company else None,
state='draft',
origin=None)
move.on_change_product()
return move
@fields.depends('product', 'production_type', 'production_location',
'quantity')
def _check_production_data(self):
return not (not self.product or
(self.production_type == 'location' and
not self.production_location) or not self.quantity)
@classmethod
def drop(cls, records, to_location, start_date, cases_quantity=None,
extra_params={}):
""" Drop unit loads over a production location
:param records: list of records or string domain
:param to_location: destination location or id. Must be production
or drop type
:param start_date: starting date
:param cases_quantity: the quantity of cases to drop
:param kwargs: dict extra params:
end_date
done_moves
return_locs (dict with product_id as key and location_id
as value)
:return: list of moves of dropping process
"""
pool = Pool()
Move = pool.get('stock.move')
Location = pool.get('stock.location')
to_create = []
_start = start_date or datetime.datetime.now()
values = extra_params.copy()
if isinstance(to_location, int):
to_location = Location(to_location)
if not records:
return
if isinstance(records, str):
records = PYSONDecoder.decode(records)
if not isinstance(records[0], Model):
records = cls.search(records)
to_do = []
for record in records:
if record.dropped:
continue
if values.get('delay'):
values['end_date'] = _start + values['delay']
drop_moves, return_moves = record._drop(
to_location, _start, cases_quantity, **values)
to_create.extend(drop_moves + return_moves)
if values.get('delay'):
_start += values['delay']
to_do.append(record)
if to_create:
Move.save(to_create)
if to_do and values.get('done_moves', False):
cls.do(to_do)
def _drop(self, to_location, start_date, cases_quantity, **kwargs):
from_location = None
for move in reversed(self.ul_moves):
if move.to_location.type == 'storage':
from_location = move.to_location
break
self.check_to_move(from_location, to_location, start_date)
new_moves = self._get_new_moves({
'from_location': from_location.id,
'to_location': to_location.id,
'start_date': start_date,
'end_date': kwargs.get('end_date', None),
},
location_type='storage',
cases_quantity=cases_quantity,
**kwargs)
return_moves = self._get_return_moves(to_location, start_date,
cases_quantity=cases_quantity, **kwargs)
return new_moves, return_moves
def _get_return_moves(self, from_location, start_date,
cases_quantity=None, **kwargs):
return_moves = self._get_new_moves({
'from_location': from_location.id,
'start_date': kwargs.get('end_date', None) or start_date,
'end_date': kwargs.get('end_date', None)
},
location_type='storage',
cases_quantity=cases_quantity,
**kwargs)
product_move, = [m for m in return_moves
if m.product.id == self.product.id]
return_moves.remove(product_move)
if kwargs.get('products', []):
return_moves = [m for m in return_moves
if m.product.id in kwargs['products']]
return_locs = kwargs.get('return_locs', {})
for move in return_moves:
loc = return_locs.get(move.product.id, None)
if not loc:
loc = self._get_return_location(move.product)
if not loc:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_missing_return_location',
product=move.product.rec_name,
unit_load=self.rec_name))
move.to_location = loc
return return_moves
def _get_return_location(self, product):
if self.warehouse:
return self.warehouse.storage_location
return None
def get_drop_moves(self, name=None):
if self.production_state != 'done':
return []
if not self.moves:
return []
if len(self.moves) == len(self.production_moves):
return []
production_ids = list(map(int, self.production_moves))
return [m.id for m in self.moves
if m.to_location.type in ('production', 'lost_found') and
m.from_location.type not in ('production', 'lost_found') and
m.id not in production_ids]
def get_return_moves(self, name=None):
if self.production_state != 'done':
return []
if len(self.moves) == len(self.production_moves):
return []
if not self.last_moves or not self.drop_moves:
return []
at_date = max(m.end_date or m.start_date for m in self.last_moves)
production_ids = list(map(int, self.production_moves))
return [m.id for m in self.moves if m.from_location.type in (
'production', 'lost_found') and m.to_location.type == 'storage' and
m.id not in production_ids and m.start_date == at_date] or []
@staticmethod
def _get_shipments():
'Return list of Model names for shipment Reference'
return [
'stock.shipment.out',
'stock.shipment.out.return',
'stock.shipment.internal',
'stock.shipment.in.return',
]
@classmethod
def get_shipments(cls):
IrModel = Pool().get('ir.model')
models = cls._get_shipments()
models = IrModel.search([
('model', 'in', models),
])
return [(None, '')] + [(m.model, m.name) for m in models]
@classmethod
def set_shipment(cls, records):
records = cls.browse(records)
for record in records:
moves = [move for move in record.last_moves
if move.product == record.product]
if moves:
move = moves[0]
record._set_shipment(move)
cls.save(records)
def _set_shipment(self, move):
self.shipment = move.shipment
@classmethod
@ModelView.button_action('stock_unit_load.wizard_drop_unit_load')
def drop_wizard(cls, records):
pass
@classmethod
def drop_try(cls, records, to_location, start_date, cases_quantity,
**kwargs):
# TODO: could be optimized (drop method call 'check_to_move' again)
for record in records:
for move in reversed(record.ul_moves):
if move.to_location.type == 'storage':
from_location = move.to_location
break
if not from_location:
from_location = record.location
record.check_to_move(from_location, to_location, start_date)
try:
cls.drop(records, to_location, start_date, cases_quantity,
extra_params=kwargs)
return True
except UserError:
return False
@classmethod
def export_data(cls, records, fields_names):
pool = Pool()
Company = pool.get('company.company')
res = super(UnitLoad, cls).export_data(records, fields_names)
company = Company(Transaction().context['company'])
lzone = dateutil.tz.gettz(company.timezone) \
if company.timezone else dateutil.tz.tzutc()
szone = dateutil.tz.tzutc()
for item in res:
for _id_field, _field in enumerate(item):
if isinstance(_field, datetime.datetime):
item[_id_field] = _field.replace(
tzinfo=szone).astimezone(lzone).replace(tzinfo=None)
return res
@classmethod
def _get_production_type_fields(cls):
return {'location': 'production_location'}
@classmethod
def auto_drop(cls, ul_code, location_code):
pool = Pool()
UnitLoad = pool.get('stock.unit_load')
Location = pool.get('stock.location')
locations = Location.search([('code', '=', location_code)], limit=1)
if locations:
location = locations[0]
uls = UnitLoad.search([('code', '=', ul_code)])
if uls:
UnitLoad.drop(
uls, location, datetime.datetime.now(),
extra_params={'done_moves': True})
return [ul.code for ul in uls]
else:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_do_drop_invalid_ul',
unit_load=ul_code))
@property
def has_pallet(self):
Conf = Pool().get('stock.configuration')
pallet_category = Conf(1).pallet_category
if pallet_category and self.last_moves:
return any(pallet_category in m.product.categories
for m in self.last_moves)
return False
def get_production_time(self, name=None):
if self.start_date and self.end_date:
value = self.end_date - self.start_date
return value - datetime.timedelta(microseconds=value.microseconds)
@classmethod
def _get_barcode_search_domain(cls, code):
return [('code', '=', code)]
class UnitLoadMove(ModelSQL, ModelView):
"""Unit load movement"""
__name__ = 'stock.unit_load.move'
unit_load = fields.Many2One('stock.unit_load', 'UL')
start_date = fields.DateTime('Start date')
end_date = fields.DateTime('End date')
state = fields.Selection([
('staging', 'Staging'),
('draft', 'Draft'),
('assigned', 'Assigned'),
('done', 'Done'),
('cancelled', 'Cancelled')], 'State')
from_location = fields.Many2One('stock.location', 'From location')
to_location = fields.Many2One('stock.location', 'To location')
quantity = fields.Float('Quantity', digits=(16, Eval('uom_digits', 2)),
depends=['uom_digits'])
uom = fields.Function(
fields.Many2One('product.uom', 'UOM'), 'get_uom')
uom_digits = fields.Function(
fields.Integer('UOM digits'), 'get_uom_digits')
product = fields.Many2One('product.product', 'Product')
@classmethod
def table_query(cls):
pool = Pool()
Move = pool.get('stock.move')
UL = pool.get('stock.unit_load')
move = Move.__table__()
ul = UL.__table__()
unit_load = Transaction().context.get('unit_load', None)
where = (Coalesce(move.effective_date, move.planned_date) != Null)
if unit_load:
where &= (move.unit_load == unit_load)
date_column = (Coalesce(move.effective_date,
move.planned_date) + move.time_)
if backend.name == 'sqlite':
date_column = Concat(Coalesce(move.effective_date,
move.planned_date), Concat(' ', move.time_))
return move.join(ul, condition=(
(move.product == ul.product) & (move.unit_load == ul.id))
).select(
Max(move.id).as_('id'),
move.unit_load,
move.state,
move.from_location,
move.to_location,
Literal(0).as_('create_uid'),
Max(move.create_date).as_('create_date'),
Literal(None).as_('write_uid'),
Literal(None).as_('write_date'),
date_column.as_('start_date'),
move.end_date,
Sum(move.internal_quantity).as_('quantity'),
move.product,
where=where,
group_by=(date_column, move.end_date, move.unit_load,
move.state, move.from_location, move.to_location,
move.product)
)
@property
def start_date_format(self):
if isinstance(self.start_date, str):
return datetime.datetime.strptime(self.start_date,
'%Y-%m-%d %H:%M:%S')
return self.start_date
@classmethod
def get_uom(cls, records, name=None):
return {r.id: r.unit_load.uom.id for r in records}
@classmethod
def get_uom_digits(cls, records, name=None):
return {r.id: r.unit_load.uom.digits for r in records}
@classmethod
def read(cls, ids, fields_names=None):
res = super(UnitLoadMove, cls).read(
ids, fields_names=fields_names)
# Convert str start_date to datetime in sqlite
for values in res:
if 'start_date' in values:
if isinstance(values['start_date'], str):
values['start_date'] = datetime.datetime(
*time.strptime(values['start_date'],
'%Y-%m-%d %H:%M:%S')[:6])
values['start_date'] = values['start_date']
return res
class MoveUnitLoadStart(ModelView):
"""Start moving unit load"""
__name__ = 'stock.unit_load.do_move_start'
planned_date = fields.DateTime('Planned date', required=True)
location = fields.Many2One('stock.location', 'Location', required=True,
domain=[('type', '=', 'storage')])
@staticmethod
def default_planned_date():
return datetime.datetime.now()
class MoveUnitLoad(Wizard):
"""Move Unit load"""
__name__ = 'stock.unit_load.do_move'
start = StateView('stock.unit_load.do_move_start',
'stock_unit_load.unit_load_move_start_view_form',
[Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'move_', 'tryton-ok', default=True)])
move_ = StateTransition()
def transition_move_(self):
pool = Pool()
Ul = pool.get('stock.unit_load')
uls = Ul.search([
('id', 'in', Transaction().context.get('active_ids'))])
Ul.move(uls, to_location=self.start.location,
at_date=self.start.planned_date)
return 'end'
class DropUnitLoadData(ModelView):
"""Drop Unit Load data"""
__name__ = 'stock.unit_load.do_drop.data'
unit_load = fields.Many2One('stock.unit_load', 'Unit load', readonly=True)
start_date = fields.DateTime('Start date', required=True)
end_date = fields.DateTime('End date',
states={
'readonly': Bool(Eval('parallel')),
},
depends=['parallel'])
location = fields.Many2One('stock.location', 'Location', required=True,
domain=[('type', 'in', ('production', 'lost_found'))])
cases_quantity = fields.Float('Cases', readonly=True,
digits=(16, Eval('cases_digits', 2)),
depends=['cases_digits'])
available_cases_quantity = fields.Float('Available cases', readonly=True,
digits=(16, Eval('cases_digits', 2)),
depends=['cases_digits'])
drop_cases_quantity = fields.Float('Cases to drop', required=True,
digits=(16, Eval('cases_digits', 2)), domain=[
('drop_cases_quantity', '>', 0),
('drop_cases_quantity', '<=', Eval('available_cases_quantity'))
],
depends=['cases_digits', 'available_cases_quantity'])
cases_digits = fields.Integer('Cases Digits')
parallel = fields.Boolean('Parallel',
states={
'invisible': Not(Equal(Eval('propose_drop_end_date'), 'last_drop'))
},
depends=['propose_drop_end_date'])
propose_drop_end_date = fields.Selection('_get_propose_drop_end_date',
'Propose drop UL end date')
@fields.depends('parallel', 'end_date')
def on_change_parallel(self):
if self.parallel and self.end_date:
self.end_date = None
@classmethod
def _get_propose_drop_end_date(cls):
Conf = Pool().get('production.configuration')
return Conf.propose_drop_end_date.selection
class DropUnitLoadFailedProduct(ModelView):
"""Drop unit load product data"""
__name__ = 'stock.unit_load.do_drop_failed.product'
product = fields.Many2One('product.product', 'Product', required=True)
location = fields.Many2One('stock.location', 'Return location',
required=True, domain=[('type', '=', 'storage')])
class DropUnitLoadFailed(ModelView):
"""Drop unit load data"""
__name__ = 'stock.unit_load.do_drop_failed'
location = fields.Many2One('stock.location', 'Return location',
domain=[('type', '=', 'storage')])
products = fields.One2Many('stock.unit_load.do_drop_failed.product',
None, 'Products')
@fields.depends('location', 'products')
def on_change_location(self):
if not self.location:
return
products = list(self.products)
for product in products:
product.location = self.location
self.products = products
class DropUnitLoadUL(ModelView):
"""Drop Unit load UL"""
__name__ = 'stock.unit_load.do_drop.unit_load'
ul_code = fields.Char('Unit load Code', required=True)
class DropUnitLoadEndDate(ModelView):
"""Drop Unit load UL"""
__name__ = 'stock.unit_load.do_drop.end_date'
unit_load = fields.Many2One('stock.unit_load', 'Unit load', readonly=True)
cases_quantity = fields.Float('Cases', readonly=True,
digits=(16, Eval('cases_digits', 2)),
depends=['cases_digits'])
available_cases_quantity = fields.Float('Available cases', readonly=True,
digits=(16, Eval('cases_digits', 2)),
depends=['cases_digits'])
drop_cases_quantity = fields.Float('Dropped cases', required=True,
digits=(16, Eval('cases_digits', 2)), domain=[
('drop_cases_quantity', '>', 0),
('drop_cases_quantity', '<=', Eval('available_cases_quantity'))
],
depends=['cases_digits', 'available_cases_quantity'])
initial_drop_cases_quantity = fields.Float('Initial Cases to drop',
digits=(16, Eval('cases_digits', 2)), domain=[
('drop_cases_quantity', '>', 0)
], states={
'invisible': Bool(True)
},
depends=['cases_digits'])
cases_digits = fields.Integer('Cases Digits')
start_date = fields.DateTime('Start date')
end_date = fields.DateTime('End date', domain=[
If(Eval('end_date'),
('end_date', '>=', Eval('start_date')),
())
], states={
'invisible': Not(Bool(Eval('end_date_edit')))
}, depends=['end_date_edit', 'start_date'],
help="Determines the drop end date. Blank to use current time.")
end_date_edit = fields.Boolean('End date edit')
class DropUnitLoad(Wizard):
"""Drop Unit load"""
__name__ = 'stock.unit_load.do_drop'
start = StateTransition()
unit_load = StateView('stock.unit_load.do_drop.unit_load',
'stock_unit_load.unit_load_drop_unit_load_view_form',
[Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'pre_data', 'tryton-ok', default=True)])
pre_data = StateTransition()
data = StateView('stock.unit_load.do_drop.data',
'stock_unit_load.unit_load_drop_data_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Set return locations', 'failed', 'tryton-forward'),
Button('OK', 'try_', 'tryton-ok', default=True)])
try_ = StateTransition()
end_date = StateView('stock.unit_load.do_drop.end_date',
'stock_unit_load.unit_load_drop_end_date_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Go back', 'go_back', 'tryton-back'),
Button('OK', 'do_', 'tryton-ok', default=True)])
do_ = StateTransition()
go_back = StateTransition()
failed = StateView('stock.unit_load.do_drop_failed',
'stock_unit_load.unit_load_drop_failed_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'force', 'tryton-ok', default=True)])
force = StateTransition()
@property
def current_ul(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
if Transaction().context.get('active_model') == Unitload.__name__:
return Unitload(Transaction().context['active_id'])
ul = Unitload.search(Unitload._get_barcode_search_domain(
self._get_ul_code()), limit=1)
if not ul:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_do_drop_invalid_ul',
unit_load=self.unit_load.ul_code))
return ul[0]
def _get_ul_code(self):
return self.unit_load.ul_code
def transition_start(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
if Transaction().context.get('active_model') == Unitload.__name__:
unit_load = Unitload(Transaction().context['active_id'])
if unit_load.dropped or not unit_load.available:
return 'end'
if unit_load.drop_moves and \
unit_load.drop_moves[-1].state != 'done':
return 'end_date'
return 'data'
return 'unit_load'
def transition_pre_data(self):
unit_load = self.current_ul
if unit_load.drop_moves and \
unit_load.drop_moves[-1].state != 'done':
return 'end_date'
return 'data'
def default_data(self, fields):
pool = Pool()
tran = Transaction()
Conf = pool.get('production.configuration')
User = pool.get('res.user')
Unitload = pool.get('stock.unit_load')
user = User(tran.user)
conf = Conf(1)
drop_date = datetime.datetime.now()
unit_load = self.current_ul
res = unit_load and {
'unit_load': unit_load.id,
'start_date': drop_date,
'location': (user.ul_drop_location.id
if user.ul_drop_location else None),
'cases_quantity': unit_load.cases_quantity,
'available_cases_quantity': unit_load.available_cases_quantity,
'drop_cases_quantity': unit_load.available_cases_quantity,
'cases_digits': unit_load.cases_digits,
'propose_drop_end_date': conf.propose_drop_end_date,
}
if (tran.context.get('active_model') == Unitload.__name__
or conf.propose_drop_end_date == 'start_date'):
res['end_date'] = drop_date
return res
def default_end_date(self, fields):
unit_load = self.current_ul
values = {
'unit_load': unit_load.id,
'cases_quantity': unit_load.cases_quantity,
'end_date_edit': False
}
if not getattr(self.data, 'available_cases_quantity', None):
move = [m for m in unit_load.drop_moves
if m.product == unit_load.product and
m.state != 'done'
]
if move:
move, = move
values.update({
'end_date_edit': True,
'start_date': move.start_date,
'end_date': move.end_date
})
drop_cases = round((
(move.internal_quantity or move.quantity) /
unit_load.internal_quantity) * unit_load.cases_quantity,
unit_load.cases_digits)
available_cases = unit_load.available_cases_quantity + \
drop_cases
else:
available_cases = self.data.available_cases_quantity
drop_cases = self.data.drop_cases_quantity
values['start_date'] = self.data.start_date
values.update({
'available_cases_quantity': available_cases,
'drop_cases_quantity': drop_cases,
'initial_drop_cases_quantity': drop_cases
})
return values
def transition_go_back(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
Unitload.cancel([self.current_ul])
return 'data'
def transition_do_(self):
pool = Pool()
Move = pool.get('stock.move')
Unitload = pool.get('stock.unit_load')
now = datetime.datetime.now()
end_date = self.end_date.end_date or now
unit_load = self.current_ul
moves = [m for m in unit_load.drop_moves
if m.state != 'done' and m.end_date < now
]
if moves:
if self.end_date.drop_cases_quantity != \
self.end_date.initial_drop_cases_quantity:
if getattr(self.data, 'location', None):
location = self.data.location
start_date = self.data.start_date
else:
location = moves[0].to_location
start_date = moves[0].start_date
# if cases to drop changes recreate moves
return_moves = [m for m in unit_load.return_moves
if m.state != 'done' and m.end_date < now
]
return_locs = {m.product.id: m.to_location.id
for m in return_moves if m.product != unit_load.product}
Move.delete(moves + return_moves)
Unitload.drop([unit_load],
to_location=location,
start_date=start_date,
cases_quantity=self.end_date.drop_cases_quantity,
extra_params={
'end_date': end_date,
'return_locs': return_locs
})
else:
Move.write(moves, {'end_date': end_date})
Unitload.do([unit_load])
if Transaction().context.get('active_model') == Unitload.__name__:
return 'end'
return 'unit_load'
def transition_try_(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
Move = pool.get('stock.move')
Configuration = pool.get('stock.configuration')
ProductionConfiguration = pool.get('production.configuration')
configuration = Configuration(1)
prod_conf = ProductionConfiguration(1)
do = (self.data.end_date and configuration.do_ul_drop)
if Unitload.drop_try([self.current_ul],
to_location=self.data.location,
start_date=self.data.start_date,
end_date=self.data.end_date,
cases_quantity=self.data.drop_cases_quantity,
done_moves=do):
if (prod_conf.propose_drop_end_date == 'last_drop'
and not self.data.parallel):
moves = Move.search(self.get_last_drop_moves_domain())
moves = sorted(moves, key=lambda m: m.start_date, reverse=True)
to_write = []
for move in moves:
if move.end_date:
break
to_write.append(move)
if to_write:
Move.write(to_write, {'end_date': self.data.start_date})
uls = list(set(m.unit_load for m in to_write
if m.state != 'done'))
if uls:
Unitload.do(uls)
elif (not self.data.end_date
and not prod_conf.propose_drop_end_date):
return 'end_date'
if Transaction().context.get('active_model') == Unitload.__name__:
return 'end'
return 'unit_load'
return 'failed'
def default_failed(self, fields):
ul = self.current_ul
res = {'products': []}
for move in ul.last_moves:
if move.product.id != ul.product.id:
res['products'].append({'product': move.product.id})
return res
def transition_force(self):
pool = Pool()
Ul = pool.get('stock.unit_load')
return_locs = {p.product.id: p.location.id
for p in self.failed.products}
Ul.drop([self.current_ul], to_location=self.data.location,
start_date=self.data.start_date,
cases_quantity=self.data.drop_cases_quantity,
extra_params={
'end_date': self.data.end_date or datetime.datetime.now(),
'return_locs': return_locs})
if not self.data.end_date:
return 'end_date'
return 'end'
def get_last_drop_moves_domain(self):
return [
('unit_load', '!=', None),
('to_location', '=', self.data.location),
('start_date', '>=', datetime.datetime.combine(
self.data.start_date.date(), datetime.datetime.min.time())),
('start_date', '<', self.data.start_date),
['OR',
('end_date', '=', None),
('end_date', '<', self.data.end_date)
]
]
# TODO: implement odt report
class UnitLoadLabel(CompanyReport):
"""Unit load label"""
__name__ = 'stock.unit_load.label'
@classmethod
def get_context(cls, records, header, data):
report_context = super(UnitLoadLabel, cls).get_context(records, header, data)
report_context['extra_info'] = lambda unit_load, language: \
cls.extra_info(unit_load, language)
report_context['product_name'] = lambda ul_id, language: \
cls.product_name(ul_id, language)
report_context['counter'] = lambda unit_load: cls.counter(unit_load)
report_context['origin'] = lambda unit_load, language: \
cls.origin(unit_load, language)
report_context['format_datetime'] = lambda value, company: \
cls.format_datetime(value, company)
return report_context
@classmethod
def extra_info(cls, unit_load, language):
"""Returns a list of tuples with extra info for UL label"""
with Transaction().set_context(language=language):
if unit_load.warehouse and unit_load.production_location:
return [(unit_load.production_location.rec_name, '')]
return []
@classmethod
def product_name(cls, ul_id, language):
UnitLoad = Pool().get('stock.unit_load')
with Transaction().set_context(language=language):
return UnitLoad(ul_id).product.rec_name
@classmethod
def counter(cls, unit_load):
_items = cls.get_counter_data(unit_load)
if _items:
return '%s / %s' % (_items.index(unit_load.id) + 1, len(_items))
return ''
@classmethod
def get_counter_data(cls, unit_load):
return []
@classmethod
def origin(cls, unit_load, language):
with Transaction().set_context(language=language):
return [(cls.get_translation('stock.move', 'origin'),
unit_load.company.party.addresses[0].country.code
if unit_load.company.party.addresses[0].country else '')]
@staticmethod
def get_translation(model, field):
_Model = Pool().get(model)
res = _Model.fields_get(fields_names=[field])
return res[field]['string']
@classmethod
def format_datetime(cls, value, company):
lzone = (dateutil.tz.gettz(company.timezone) if company.timezone
else dateutil.tz.tzutc())
szone = dateutil.tz.tzutc()
return value.replace(tzinfo=szone).astimezone(
lzone).replace(tzinfo=None)
class BatchDropUnitLoadData(ModelView):
"""Batch dropping UL data"""
__name__ = 'stock.unit_load.batch_drop.data'
location = fields.Many2One('stock.location', 'Location', required=True,
domain=[('type', 'in', ['production', 'lost_found'])])
start_date = fields.DateTime('Start date', required=True)
end_date = fields.DateTime('End date', required=True,
domain=[('end_date', '>=', Eval('start_date'))],
depends=['start_date'])
delay_ = fields.TimeDelta('Delay')
unit_loads = fields.Many2Many('stock.unit_load', None, None, 'Unit loads',
required=True, domain=[('available', '=', True)])
@fields.depends('unit_loads', 'start_date', 'end_date')
def on_change_with_delay_(self):
if self.start_date and self.end_date and self.unit_loads:
_delta = self.end_date - self.start_date
return _delta / len(self.unit_loads)
@fields.depends('delay_', 'start_date', 'unit_loads')
def on_change_delay_(self):
if self.delay_ and self.start_date and self.unit_loads:
self.end_date = self.start_date + (
self.delay_ * len(self.unit_loads))
class BatchDropUnitLoadConfirm(ModelView):
"""Batch dropping UL confirm"""
__name__ = 'stock.unit_load.batch_drop.confirm'
unit_loads = fields.One2Many('stock.unit_load', None, 'Unit loads',
readonly=True)
class BatchDropUnitLoad(Wizard):
"""Massive UL dropping"""
__name__ = 'stock.unit_load.batch_drop'
start = StateTransition()
data = StateView('stock.unit_load.batch_drop.data',
'stock_unit_load.unit_load_batch_drop_data_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'confirm', 'tryton-ok', default=True)])
confirm = StateView('stock.unit_load.batch_drop.confirm',
'stock_unit_load.unit_load_batch_drop_confirm_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Confirm', 'do_', 'tryton-ok', default=True,
states={'invisible': ~Id('stock_unit_load',
'group_unit_load_batch_drop').in_(Eval('context', {}
).get('groups', []))})])
do_ = StateTransition()
def transition_start(self):
uls = self._get_unit_loads()
for ul in uls:
if not ul.available:
raise UserError(gettext(
'stock_unit_load.msg_stock_unit_load_ul_not_available',
unit_load=ul.rec_name))
return 'data'
def _get_unit_loads(self):
if Transaction().context['active_model'] == 'stock.unit_load':
UnitLoad = Pool().get('stock.unit_load')
return UnitLoad.browse(Transaction().context['active_ids'])
return []
def default_data(self, fields):
if Transaction().context['active_model'] == 'stock.unit_load':
return {'unit_loads': Transaction().context['active_ids']}
return {}
def default_confirm(self, fields):
return {'unit_loads': list(map(int, self.data.unit_loads))}
def transition_do_(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
if not self.data.unit_loads:
return 'end'
uls = Unitload.browse(list(map(int, self.confirm.unit_loads)))
Unitload.drop(uls, self.data.location,
self.data.start_date, extra_params={
'delay': self.data.delay_,
'done_moves': True
})
return 'end'
class CaseLabel(CompanyReport):
__name__ = "stock.unit_load.case_label"
@classmethod
def get_context(cls, records, header, data):
context = super().get_context(records, header, data)
context['address'] = lambda unit_load: cls.get_address(unit_load)
return context
@classmethod
def get_address(cls, unit_load):
if unit_load.warehouse:
return unit_load.warehouse.address
return unit_load.company.party.address_get()