trytond-stock_unit_load/unit_load.py

1632 lines
60 KiB
Python

# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
import time
import datetime
import dateutil
from functools import partial
from itertools import groupby
from sql import Literal, Null
from sql.aggregate import Max, Sum
from sql.conditionals import Coalesce
from sql.operators import Concat
from trytond import backend
from trytond.exceptions import UserError
from trytond.model import Model
from trytond.modules.company import CompanyReport
from trytond.pool import Pool
from trytond.pyson import (Not, Eval, Bool, Equal, Date, If, Id,
PYSONDecoder)
from trytond.model import fields, ModelView, ModelSQL
from trytond.transaction import Transaction
from trytond.modules.stock_move_time.stock import DATE_FORMAT
from trytond.wizard import Wizard, StateTransition, StateView, Button
__all__ = ['UnitLoad', 'UnitLoadMove', 'MoveUnitLoad',
'MoveUnitLoadStart', 'UnitLoadLabel', 'DropUnitLoadStart',
'DropUnitLoad', 'DropUnitLoadFailed', 'DropUnitLoadFailedProduct',
'BatchDropUnitLoad', 'BatchDropUnitLoadData', 'BatchDropUnitLoadConfirm']
MOVE_CHANGES = ['product', 'uom', 'production_type', 'production_location',
'warehouse', 'production_moves', 'moves', 'production_state', 'company',
'quantity', 'start_date', 'end_date', 'cases_quantity']
class UnitLoad(ModelSQL, ModelView):
"""Unit load"""
__name__ = 'stock.unit_load'
_rec_name = 'code'
code = fields.Char('Code', required=True, select=True,
states={'readonly': Eval('code_readonly', True)},
depends=['code_readonly'])
code_readonly = fields.Function(fields.Boolean('Code Readonly'),
'get_code_readonly')
code_length = fields.Integer('Code Length', select=True, readonly=True)
company = fields.Many2One('company.company', 'Company', required=True,
states={
'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
domain=[('id', If(Eval('context', {}).contains('company'), '=', '!='),
Eval('context', {}).get('company', -1))],
depends=['state', 'production_state'], select=True)
product = fields.Many2One('product.product', 'Product',
select=True, required=True,
ondelete='RESTRICT',
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['state', 'production_state'],)
uom = fields.Function(fields.Many2One('product.uom', 'UOM'),
'on_change_with_uom')
uom_category = fields.Function(
fields.Many2One('product.uom.category', 'UOM Category'),
'on_change_with_uom_category')
uom_digits = fields.Function(fields.Integer('UOM Digits'),
'on_change_with_uom_digits')
quantity = fields.Function(
fields.Float('Quantity', digits=(16, Eval('uom_digits', 2)),
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['uom_digits', 'state', 'production_state']),
'get_quantity', setter='set_quantity', searcher='search_quantity')
forecast_quantity = fields.Function(
fields.Float('Forecast Quantity', digits=(16, Eval('uom_digits', 2)),
depends=['uom_digits']),
'get_quantity', searcher='search_quantity')
internal_quantity = fields.Float('Internal Quantity', readonly=True)
moves = fields.One2Many('stock.move', 'unit_load', 'Moves',
readonly=True,
order=[('effective_date', 'ASC'), ('time_', 'ASC'), ('id', 'ASC')],
domain=[('company', '=', Eval('company', -1))],
depends=['company'])
location = fields.Function(
fields.Many2One('stock.location', 'Location', depends=['moves']),
'get_location')
locations = fields.Function(
fields.One2Many('stock.location', None, 'Locations'),
'get_locations')
last_date = fields.Function(
fields.DateTime('Last date',
states={'readonly': Not(Equal(Eval('state'), 'draft'))},
depends=['state']),
'get_last_date')
last_moves = fields.Function(
fields.One2Many('stock.move', None, 'Last moves',
domain=[('unit_load', '=', Eval('id'))],
depends=['id']),
'get_last_moves')
ul_moves = fields.One2Many('stock.unit_load.move', 'unit_load',
'UL moves', readonly=True,
order=[('start_date', 'ASC'), ('end_date', 'ASC'), ('id', 'ASC')],
states={'invisible': Eval('production_state') != 'done'},
depends=['production_state'])
state = fields.Function(fields.Selection([
('staging', 'Staging'),
('draft', 'Draft'),
('assigned', 'Assigned'),
('done', 'Done'),
('cancel', 'Canceled')], 'State', select=True, readonly=True),
'get_state', searcher='search_state')
production_type = fields.Function(
fields.Selection([('location', 'Location')], 'Production type',
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['state', 'production_state']),
'get_production_type', setter='set_production_type')
warehouse = fields.Many2One('stock.location', 'Warehouse', select=True,
domain=[('type', '=', 'warehouse')],
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['state', 'production_state'])
warehouse_production = fields.Function(
fields.Many2One('stock.location', 'Warehouse production',
domain=[('type', '=', 'production')]),
'on_change_with_warehouse_production')
production_location = fields.Many2One('stock.location',
'Production location', select=True,
domain=[('type', '=', 'production'),
If(Bool(Eval('warehouse')),
('parent', 'child_of', Eval('warehouse_production')),
())],
states={
'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done'),
'required': Eval('production_type') == 'location',
'invisible': Eval('production_type') != 'location'
},
depends=['production_state', 'production_type', 'warehouse',
'warehouse_production', 'state'])
production_state = fields.Function(
fields.Selection([
('running', 'Running'),
('done', 'Done')], 'Production state',
depends=['production_moves']),
'get_production_state')
production_moves = fields.Function(
fields.One2Many('stock.move', 'unit_load', 'Production moves',
domain=[['OR',
[('product', '=', Eval('product')),
('from_location.type', '=', 'production'),
('to_location.type', '=', 'storage')],
[('product', '!=', Eval('product')),
['OR',
[('from_location.type', '=', 'production'),
('to_location.type', '=', 'storage')],
[('to_location.type', '=', 'production'),
('from_location.type', '=', 'storage')]]
]],
('company', '=', Eval('company', -1))],
states={'readonly': Eval('production_state') == 'done',
'invisible': Eval('production_state') == 'done'},
depends=['production_state', 'company', 'product']),
'get_production_moves', setter='set_production_moves')
available = fields.Function(fields.Boolean('Available',
depends=['locations']),
'get_available', searcher='search_available')
start_date = fields.DateTime('Start date', required=True,
format=DATE_FORMAT, states={
'readonly': (Eval('production_state') == 'done') | (
Eval('state') != 'draft')},
depends=['production_state', 'state'])
end_date = fields.DateTime('End date', required=True, format=DATE_FORMAT,
states={'readonly': (Eval('production_state') == 'done') | (
Eval('state') != 'draft')},
depends=['production_state', 'state'])
drop_moves = fields.Function(
fields.One2Many('stock.move', None, 'Drop moves'),
'get_drop_moves')
return_moves = fields.Function(
fields.One2Many('stock.move', None, 'Return moves'),
'get_return_moves')
location_type = fields.Function(
fields.Char('Location type', states={'invisible': Bool(True)},
depends=['location']),
'get_location_type')
dropped = fields.Boolean('Dropped', readonly=True)
shipment_out = fields.Function(
fields.Many2One('stock.shipment.out', 'Shiptment out'),
'get_shipment_out')
cases_quantity = fields.Float('Cases', required=True,
digits=(16, Eval('cases_digits', 2)),
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['cases_digits', 'production_state', 'state'])
quantity_per_case = fields.Function(
fields.Float('Quantity per Case', digits=(16, Eval('uom_digits', 0)),
states={'readonly': (Eval('state') != 'draft') | (
Eval('production_state') == 'done')},
depends=['uom_digits', 'production_state', 'state']),
'get_quantity_per_case', setter='set_quantity_per_case')
cases_digits = fields.Function(fields.Integer('Cases Digits'),
'on_change_with_cases_digits')
@classmethod
def __setup__(cls):
super(UnitLoad, cls).__setup__()
cls._error_messages.update({
'missing_location':
'Cannot find current location of UL "%s" from its moves.',
'wrong_move_location':
'Cannot move unit load "%s" to Location "%s". '
'Check its movements.',
'wrong_move_date':
'Cannot move unit load "%s" at date "%s" because later '
'moves exist.',
'wrong_state':
'Unit load "%s" must be in Done state before moving.',
'state_origin':
'Cannot change state of UL "%s" due to its last moves '
'come from "%s".',
'missing_return_location':
'Cannot set location in returning move of product "%s" '
'of UL "%s" during its dropping process.',
'wrong_dropped_qty':
'Cannot drop more quantity (%s) than total quantity (%s) '
'in UL "%s".'
})
cls._buttons.update({
'move_try': {
'icon': 'tryton-go-next',
'invisible': (Eval('state') != 'done') | (~Eval('available')) |
Eval('dropped'),
'depends': ['state', 'available', 'dropped']
},
'assign': {
'icon': 'tryton-go-next',
'invisible': (Eval('state') != 'draft'),
'depends': ['state']
},
'do': {
'icon': 'tryton-ok',
'invisible': Eval('state') != 'assigned',
'depends': ['state']
},
'cancel': {
'icon': 'tryton-cancel',
'invisible': Eval('state').in_(['cancel', 'done']),
'depends': ['state']
},
'draft': {
'icon': 'tryton-clear',
'invisible': ~Eval('state').in_(['assigned']),
'depends': ['state']
},
'drop_wizard': {
'icon': 'tryton-go-next',
'invisible': (Eval('state') != 'done') | (~Eval('available')) |
Eval('dropped'),
'depends': ['state', 'available', 'dropped']
}
})
@staticmethod
def default_code_readonly():
model_config = Pool().get('stock.configuration')
config = model_config(1)
return bool(config.unit_load_sequence)
def get_code_readonly(self, name):
return True
@staticmethod
def default_company():
return Transaction().context.get('company')
@classmethod
def default_state(cls):
return 'draft'
@staticmethod
def default_start_date():
return datetime.datetime.now()
@staticmethod
def default_end_date():
return datetime.datetime.now()
@staticmethod
def default_dropped():
return False
@classmethod
def create(cls, vlist):
pool = Pool()
model_sequence = pool.get('ir.sequence')
model_config = pool.get('stock.configuration')
vlist = [x.copy() for x in vlist]
config = model_config(1)
for values in vlist:
if not values.get('code'):
values['code'] = model_sequence.get_id(
config.unit_load_sequence.id)
values['code_length'] = len(values['code'])
return super(UnitLoad, cls).create(vlist)
@classmethod
def write(cls, *args):
actions = iter(args)
args = []
for unit_load, values in zip(actions, actions):
if values.get('code'):
values = values.copy()
values['code_length'] = len(values['code'])
args.extend((unit_load, values))
super(UnitLoad, cls).write(*args)
actions = iter(args)
for uls, values in zip(actions, actions):
cls._set_internal_quantity(uls)
@staticmethod
def _get_internal_quantity(quantity, uom, product):
Uom = Pool().get('product.uom')
internal_quantity = Uom.compute_qty(uom, quantity,
product.default_uom, round=True)
return internal_quantity
@classmethod
def copy(cls, uls, default=None):
pool = Pool()
Move = pool.get('stock.move')
if default is None:
default = {}
default = default.copy()
default['code'] = None
default['moves'] = None
default['ul_moves'] = None
default['dropped'] = False
new_uls = []
for ul in uls:
new_ul, = super(UnitLoad, cls).copy([ul], default=default)
Move.copy(ul.production_moves, default={
'unit_load': new_ul.id})
new_uls.append(new_ul)
return new_uls
@fields.depends('product')
def on_change_with_uom(self, name=None):
if self.product:
return self.product.default_uom.id
return None
@fields.depends('product')
def on_change_with_uom_digits(self, name=None):
if self.product:
return self.product.default_uom.digits
return 2
@fields.depends('uom')
def on_change_with_uom_category(self, name=None):
if self.uom:
return self.uom.category.id
return None
@fields.depends('start_date')
def on_change_start_date(self):
if self.start_date:
self.end_date = self.start_date
@classmethod
def get_quantity(cls, records, name):
pool = Pool()
Uom = pool.get('product.uom')
location_ids = Transaction().context.get('locations')
if location_ids:
# get quantity from stock query based on location
with Transaction().set_context(cls._quantity_context(name)):
return cls._get_quantity(records, name, location_ids)
# get quantity from production moves
res = {}
for record in records:
if record.production_state == 'done':
res[record.id] = record.internal_quantity
else:
res[record.id] = sum(Uom.compute_qty(m.uom, m.quantity,
record.uom) for m in record.production_moves
if m.product.id == record.product.id) or 0
return res
@classmethod
def _quantity_context(cls, name):
new_context = {}
if name == 'quantity':
if (new_context.get('stock_date_end') and
new_context['stock_date_end'] > Date.today()):
new_context['stock_date_end'] = Date.today()
elif name == 'forecast_quantity':
new_context['forecast'] = True
if not new_context.get('stock_date_end'):
new_context['stock_date_end'] = datetime.date.max
return new_context
@classmethod
def _get_quantity(cls, records, name, location_ids):
location_ids, wh_to_add, storage_to_remove = \
cls._skip_warehouse_in_compute_quantities(location_ids)
uls = dict.fromkeys(map(int, records), 0)
ul_products = {r.id: r.product.id for r in records}
quantities = cls._compute_quantities(records, location_ids)
if wh_to_add:
for wh, storage in wh_to_add.iteritems():
for key in quantities:
if key[0] == storage:
quantities[(wh,) + key[1:]] = quantities[key]
if storage in storage_to_remove:
del quantities[key]
for key, quantity in quantities.iteritems():
if (key[-1] is not None and
key[-1] in uls and
key[1] == ul_products[key[-1]]):
uls[key[-1]] += quantity
return uls
@classmethod
def _compute_quantities(cls, records, location_ids, with_childs=False,
product_id=None):
pool = Pool()
Move = pool.get('stock.move')
product_ids = [r.product.id if not product_id else product_id
for r in records]
grouping = ('product', 'unit_load',)
grouping_filter = (product_ids, tuple(r.id for r in records))
query = Move.compute_quantities_query(location_ids,
with_childs=with_childs,
grouping=grouping,
grouping_filter=grouping_filter)
if query is None:
return {}
# TODO: we should filter by main product of UL (with a join or subselect)
quantities = Move.compute_quantities(query, location_ids,
with_childs=with_childs,
grouping=grouping,
grouping_filter=grouping_filter)
return quantities
@classmethod
def _skip_warehouse_in_compute_quantities(cls, location_ids):
pool = Pool()
Location = pool.get('stock.location')
storage_to_remove = set()
wh_to_add = {}
if Transaction().context.get('stock_skip_warehouse'):
location_ids = set(location_ids)
for location in Location.browse(list(location_ids)):
if location.type == 'warehouse':
location_ids.remove(location.id)
if location.storage_location.id not in location_ids:
storage_to_remove.add(location.storage_location.id)
location_ids.add(location.storage_location.id)
wh_to_add[location.id] = location.storage_location.id
location_ids = list(location_ids)
return location_ids, wh_to_add, storage_to_remove
@classmethod
def set_quantity(cls, records, name, value):
if value:
cls.write(records, {'internal_quantity': value})
@classmethod
def search_quantity(cls, name, domain=None):
location_ids = Transaction().context.get('locations')
return cls._search_quantity(name, location_ids, domain)
@classmethod
def _search_quantity(cls, name, location_ids, domain=None):
pool = Pool()
Move = pool.get('stock.move')
ul = cls.__table__()
if not location_ids:
return []
location_ids, _, _ = cls._skip_warehouse_in_compute_quantities(
location_ids)
with Transaction().set_context(cls._quantity_context(name)):
grouping = ('product', 'unit_load',)
query = Move.compute_quantities_query(location_ids,
with_childs=True,
grouping=grouping,
grouping_filter=None)
having_domain = getattr(cls, name)._field.convert_domain(domain, {
None: (query, {}),
}, cls)
query = query.join(ul, condition=(
(query.unit_load == ul.id) & (query.product == ul.product))
).select(query.location,
query.product,
query.unit_load,
query.quantity,
where=having_domain)
quantities = Move.compute_quantities(query, location_ids,
with_childs=True,
grouping=grouping,
grouping_filter=None)
record_ids = []
for key, quantity in quantities.iteritems():
# pbl could return None in some keys
if key[-1] is not None:
record_ids.append(key[-1])
return [('id', 'in', record_ids)]
@classmethod
def get_available(cls, records, name=None):
res = {}
for record in records:
res[record.id] = not record.dropped
if not res[record.id]:
continue
if all(m.state == 'cancel' for m in record.ul_moves):
res[record.id] = False
return res
@classmethod
def search_available(cls, name, clause):
reverse = {'=': '!=',
'!=': '='}
if clause[1] in reverse:
res = [('dropped', reverse[clause[1]], clause[2])]
return res
return []
@classmethod
def get_available_locations_domain(cls):
return [('type', '=', 'storage')]
@classmethod
def get_location(cls, records, name=None, product_id=None, type=None):
pool = Pool()
Move = pool.get('stock.move')
value = dict.fromkeys(map(int, records), None)
for record in records:
moves = record.get_last_moves(product_id=product_id,
location_type=type)
if moves:
value[record.id] = Move(moves[0]).to_location.id
return value
def get_location_type(self, name=None):
if self.location:
return self.location.type
return None
@classmethod
def _get_locations(cls, records, product_id=None, type=None):
record_ids = [r.id for r in records]
ul_products = {r.id: r.product.id if not product_id else product_id
for r in records}
location_ids = list(set(
[m.to_location.id for r in records for m in r.moves
if (not type or m.to_location.type == type)]))
locations = {r: [] for r in record_ids}
quantities = cls._compute_quantities(records, location_ids,
product_id=product_id)
for key, quantity in quantities.iteritems():
if quantity <= 0:
continue
if key[1] == ul_products[key[-1]]:
locations[key[-1]].append(key[0])
return locations
@classmethod
def get_locations(cls, records, name=None, product_id=None, type=None):
context = Transaction().context
if not context.get('stock_date_max'):
context['stock_date_end'] = datetime.date.max
if 'forecast' not in context:
context['forecast'] = True
context['active_test'] = False
with Transaction().set_context(context):
return cls._get_locations(records, product_id, type)
def get_last_date(self, name=None):
if not self.moves:
return None
_moves = [m for m in self.moves if m.state != 'cancel' and
m.product.id == self.product.id]
if not _moves:
return None
return max(m.end_date for m in _moves)
def get_last_moves(self, name=None, product_id=None, location_type=None):
if not self.moves:
return []
if not product_id:
product_id = self.product.id
max_date, location_id = None, None
for move in self.moves:
if move.state == 'cancel':
continue
if move.product.id != product_id:
continue
if location_type and location_type != move.to_location.type:
continue
if max_date and max_date > move.end_date:
continue
max_date = move.end_date
location_id = move.to_location.id
return [m.id for m in self.moves if m.end_date == max_date and
m.to_location.id == location_id]
def get_production_type(self, name=None):
if self.production_location:
return 'location'
return None
@classmethod
def default_production_type(cls):
pool = Pool()
Configuration = pool.get('stock.configuration')
return Configuration(1).ul_production_type or 'location'
@classmethod
def set_production_type(cls, records, name, value):
pass
@fields.depends('warehouse')
def on_change_with_warehouse_production(self, name=None):
if self.warehouse:
return self.warehouse.production_location.id
return None
@classmethod
def default_production_state(cls):
return 'running'
def get_production_state(self, name=None):
if (self.production_moves and
all(m.state in ('done', 'cancel')
for m in self.production_moves)):
return 'done'
return 'running'
def get_production_moves(self, name=None):
if not self.moves:
return []
values = []
if self.production_type == 'location':
values = [m for m in self.moves
if (m.from_location.type == 'production' and
m.from_location.id == self.production_location.id and
m.to_location.type == 'storage') or
(m.to_location.id == self.production_location.id and
m.product.id != self.product.id and
m.from_location.type == 'storage')] or []
if any(m.state == 'done' for m in values):
return [m.id for m in values
if m.start_date == self.start_date] or []
return [m.id for m in values] or []
@classmethod
def set_production_moves(cls, records, name, value):
if not value:
return
new_value, to_delete = [], []
for action in value:
# never unlink, always delete
if action[0] in ('delete', 'remove'):
to_delete.extend(action[1])
else:
new_value.append(action)
new_value.append(('delete', to_delete))
cls.write(records, {'moves': new_value})
@classmethod
def move(cls, records, to_location, at_date):
pool = Pool()
Move = pool.get('stock.move')
to_create = []
for record in records:
new_moves = record._move(to_location, at_date)
to_create.extend(new_moves)
if to_create:
Move.save(to_create)
return Move.browse(map(int, to_create))
def _move(self, to_location, at_date, from_location=None):
if not from_location:
from_location = self.location
self.check_to_move(from_location, to_location, at_date)
new_moves = self._get_new_moves({'from_location': from_location.id,
'to_location': to_location.id,
'start_date': at_date,
'end_date': at_date})
return new_moves
def check_to_move(self, from_location, to_location, at_date):
pool = Pool()
Move = pool.get('stock.move')
if not from_location:
self.raise_user_error('missing_location', self.rec_name)
if self.state != 'done':
self.raise_user_error('wrong_state', self.rec_name)
if to_location.id == from_location.id:
self.raise_user_error('wrong_move_location',
(self.rec_name, to_location.rec_name))
_max_date = max(m.end_date for m in self.last_moves)
if from_location.type == 'storage' and \
to_location.type == 'production':
# allow overlapped drops
if self.drop_moves:
_last_moves = self.get_last_moves(location_type='storage')
if _last_moves:
_last_moves = Move.browse(_last_moves)
_max_date = max(m.end_date for m in _last_moves)
if _max_date > at_date:
self.raise_user_error('wrong_move_date', (self.rec_name, at_date))
def _get_new_moves(self, default_values={}, location_type=None):
pool = Pool()
Move = pool.get('stock.move')
Uom = pool.get('product.uom')
default_values.update({
'unit_load': self.id,
'origin': None,
'shipment': None,
'planned_date': default_values.get('start_date').date() or None
})
moves = []
_last_moves = Move.browse(self.get_last_moves(
location_type=location_type))
if not default_values.get('from_location'):
if not location_type:
default_values['from_location'] = self.location.id
elif _last_moves:
default_values['from_location'] = _last_moves[0].to_location.id
keyfunc = partial(self._get_group_move_key, _last_moves)
data = sorted(_last_moves, key=keyfunc)
for key, grouped_moves in groupby(data, key=keyfunc):
_grouped_moves = list(grouped_moves)
move = self._get_new_move(_grouped_moves[0], default_values)
move.quantity = self._get_quantity_to_move(
_grouped_moves, move.uom)
dropped_qty = 0
if location_type == 'storage' and self.drop_moves:
dropped_qty = self._get_dropped_quantity(move.product,
move.uom)
if dropped_qty:
move.quantity -= dropped_qty
if move.quantity:
moves.append(move)
return moves
def _get_quantity_to_move(self, _grouped_moves, uom):
pool = Pool()
Uom = pool.get('product.uom')
return sum(Uom.compute_qty(m.uom, m.quantity, uom)
for m in _grouped_moves)
def _get_new_move(self, move, default_values):
pool = Pool()
Move = pool.get('stock.move')
new_move = Move(**default_values)
new_move.product = move.product
new_move.uom = move.uom
new_move.quantity = move.quantity
new_move.company = move.company
new_move.currency = move.currency
new_move.unit_price = move.unit_price
if getattr(move, 'lot', None):
new_move.lot = move.lot
return new_move
def _get_group_move_key(self, moves, move):
res = (move.to_location.id, move.product.id)
try:
Lot = Pool().get('stock.lot')
res = res + (move.lot.id if move.lot else None, )
except KeyError:
pass
return res
def get_state(self, name=None):
pool = Pool()
Move = pool.get('stock.move')
priority = {
'draft': 0,
'assigned': 1,
'done': 2,
'cancel': 3
}
if not self.ul_moves:
return Move.default_state()
_states = sorted(set(m.state for m in self.ul_moves),
key=lambda x: priority[x])
return _states[0]
@classmethod
def search_state(cls, name, clause):
pool = Pool()
Move = pool.get('stock.move')
type_name = cls.state._field.sql_type().base
move = Move.__table__()
ul = cls.__table__()
Operator = fields.SQL_OPERATORS[clause[1]]
date_column = Max(Coalesce(move.effective_date, move.planned_date
)).as_('date')
union = ul.join(move, 'LEFT', condition=(
(move.unit_load == ul.id) & (move.product == ul.product))
).select(ul.id.as_('unit_load'),
date_column,
Max(move.id).as_('move'),
Coalesce(move.state, 'draft').as_('state'),
where=(Coalesce(move.state, 'draft') != 'cancel'),
group_by=(ul.id, Coalesce(move.state, 'draft')))
query = union.select(union.unit_load,
where=(Operator(union.state.cast(type_name), clause[2])))
return [('id', 'in', query)]
@fields.depends('cases_quantity')
def on_change_with_cases_digits(self, name=None):
return self.default_cases_digits()
@classmethod
def default_cases_digits(cls):
pool = Pool()
Modeldata = pool.get('ir.model.data')
Uom = pool.get('product.uom')
return Uom(Modeldata.get_id('product', 'uom_unit')).digits
@fields.depends('quantity_per_case', 'cases_quantity',
methods=['quantity'])
def on_change_quantity_per_case(self):
if self.quantity_per_case and self.cases_quantity:
self.quantity = self.quantity_per_case * self.cases_quantity
self.on_change_quantity()
def get_quantity_per_case(self, name=None):
if self.quantity and self.cases_quantity and self.uom:
return self.uom.round(self.quantity / self.cases_quantity)
return None
@classmethod
def set_quantity_per_case(cls, records, name, value):
pass
@fields.depends(*(MOVE_CHANGES + ['quantity_per_case']),
methods=['product'])
def on_change_cases_quantity(self):
if not self.cases_quantity:
return
if self.quantity_per_case:
self.quantity = self.cases_quantity * self.quantity_per_case
self.explode_production_moves()
@classmethod
def delete(cls, records):
pool = Pool()
Move = pool.get('stock.move')
Move.cancel([m for r in records for m in r.moves])
Move.delete([m for r in records for m in r.moves])
super(UnitLoad, cls).delete(records)
@classmethod
@ModelView.button_action('stock_unit_load.wizard_move_unit_load')
def move_try(cls, records):
pass
@classmethod
@ModelView.button
def draft(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = [m for r in records for m in r.moves
if m.state in ['cancel', 'assigned']]
for move in moves:
if not cls.check_move_origin(move):
cls.raise_user_error('state_origin', (
move.unit_load.rec_name, move.origin.rec_name))
if moves:
Move.draft(moves)
@classmethod
def get_moves_to_cancel(cls, records):
return [m for r in records for m in r.moves if m.state in
['draft', 'assigned']]
@classmethod
@ModelView.button
def cancel(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = cls.get_moves_to_cancel(records)
for move in moves:
if not cls.check_move_origin(move):
cls.raise_user_error('state_origin', (
move.unit_load.rec_name, move.origin.rec_name))
if moves:
Move.cancel(moves)
Move.delete(moves)
@classmethod
@ModelView.button
def assign(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = [m for r in records for m in r.moves if m.state == 'draft']
cls._set_internal_quantity(records)
origin_moves = [m for r in records for m in r.moves
if m.state in ('draft', 'assigned')]
for move in origin_moves:
if not cls.check_move_origin(move):
cls.raise_user_error('state_origin', (
move.unit_load.rec_name, move.origin.rec_name))
cls.check_dates(records)
if moves:
Move.assign(moves)
@classmethod
@ModelView.button
def do(cls, records):
pool = Pool()
Move = pool.get('stock.move')
moves = [m for r in records for m in r.moves
if m.state in ('draft', 'assigned')]
cls.check_dates(records)
for move in moves:
if not cls.check_move_origin(move):
cls.raise_user_error('state_origin', (move.unit_load.rec_name,
move.origin.rec_name))
if moves:
Move.do(moves)
cls.set_drop_state(records)
return_moves = [m for r in records for m in r.return_moves]
if return_moves:
Move.write(return_moves, {'unit_load': None})
@classmethod
def set_drop_state(cls, records):
to_drop = []
to_undrop = []
changes = {
True: to_drop,
False: to_undrop
}
for record in records:
if record.dropped and not record.drop_moves:
to_undrop.append(record)
continue
qty = record._get_dropped_quantity()
if qty > record.quantity:
cls.raise_user_error('wrong_dropped_qty', (
qty, record.quantity, record.rec_name))
if qty == record.internal_quantity and not record.dropped:
to_drop.append(record)
elif qty < record.internal_quantity and record.dropped:
to_undrop.append(record)
for key, values in changes.iteritems():
if values:
cls.write(values, {'dropped': key})
def _get_dropped_quantity(self, product=None, to_uom=None):
if not self.drop_moves:
return 0.0
_product = product or self.product
return sum(self.uom.compute_qty(m.uom, m.quantity, to_uom or self.uom)
for m in self.drop_moves if m.product.id == _product.id)
@classmethod
def _set_internal_quantity(cls, records):
for record in records:
if not record.quantity:
continue
internal_quantity = cls._get_internal_quantity(record.quantity,
record.uom, record.product)
if internal_quantity != record.internal_quantity:
cls.write([record], {
'internal_quantity': internal_quantity,
})
@classmethod
def check_move_origin(cls, move):
return not move.origin and not move.shipment
@classmethod
def check_dates(cls, records):
pool = Pool()
Move = pool.get('stock.move')
to_write = []
for record in records:
if record.production_state != 'running':
continue
inputs = record._get_input_moves_to_check_date()
outputs = [m for m in record.production_moves
if m.from_location.type == 'production']
to_write.extend([inputs, {
'start_date': record.start_date,
'end_date': record.start_date
},
outputs, {
'start_date': record.start_date,
'end_date': record.end_date
}])
if to_write:
Move.write(*to_write)
def _get_input_moves_to_check_date(self):
return [m for m in self.production_moves
if m.to_location.type == 'production']
@fields.depends(*MOVE_CHANGES)
def on_change_product(self):
if self.product:
self.uom = self.product.default_uom
self.uom_category = self.uom.category
self.explode_production_moves()
@fields.depends(*MOVE_CHANGES)
def on_change_quantity(self):
self.quantity_per_case = self.get_quantity_per_case()
self.explode_production_moves()
@fields.depends(*MOVE_CHANGES)
def on_change_production_location(self):
self.explode_production_moves()
@ModelView.button_change(*MOVE_CHANGES)
def reset_production_moves(self):
self.explode_production_moves()
def explode_production_moves(self):
if self.production_state == 'done':
return
if not self._check_production_data():
self.production_moves = []
return
_origin = None
if self.production_moves:
for move in self.production_moves:
if move.origin:
_origin = move.origin
break
move = self._get_production_move()
move.origin = _origin
self.production_moves = [move]
def _get_production_move(self):
pool = Pool()
Move = pool.get('stock.move')
move = Move(company=self.company,
from_location=self.production_location,
to_location=self.warehouse.storage_location
if self.warehouse else None,
product=self.product,
quantity=self.quantity,
planned_date=self.start_date.date(),
effective_date=self.start_date.date(),
start_date=self.start_date,
end_date=self.end_date,
currency=self.company.currency if self.company else None,
state='draft',
origin=None)
move.on_change_product()
return move
def _check_production_data(self):
return not (not self.product or
(self.production_type == 'location' and
not self.production_location) or not self.quantity)
@classmethod
def drop(cls, records, to_location, start_date, extra_params={}):
""" Drop unit loads over a production location
:param records: list of records or string domain
:param to_location: destination location or id. Must be production
or drop type
:param start_date: starting date
:param kwargs: dict extra params:
end_date
done_moves
return_locs (dict with product_id as key and location_id
as value)
:return: list of moves of dropping process
"""
pool = Pool()
Move = pool.get('stock.move')
Location = pool.get('stock.location')
to_create = []
_start = start_date or datetime.datetime.now()
values = extra_params.copy()
if isinstance(to_location, int):
to_location = Location(to_location)
if not records:
return []
if isinstance(records, basestring):
records = PYSONDecoder.decode(records)
if not isinstance(records[0], Model):
records = cls.search(records)
for record in records:
if record.dropped:
continue
if values.get('delay'):
values['end_date'] = _start + values['delay']
drop_moves, return_moves = record._drop(
to_location, _start, **values)
to_create.extend(drop_moves + return_moves)
if values.get('delay'):
_start += values['delay']
if to_create:
Move.save(to_create)
if values.get('done_moves', False):
Move.do(to_create)
cls.write(records, {'dropped': True})
return Move.browse(map(int, to_create))
def _drop(self, to_location, start_date, **kwargs):
from_location = None
for move in reversed(self.ul_moves):
if move.to_location.type == 'storage':
from_location = move.to_location
break
self.check_to_move(from_location, to_location, start_date)
new_moves = self._get_new_moves({
'from_location': from_location.id,
'to_location': to_location.id,
'start_date': start_date,
'end_date': kwargs.get('end_date', None),
}, location_type='storage')
return_moves = self._get_new_moves({
'from_location': to_location.id,
'start_date': kwargs.get('end_date', None) or start_date,
'end_date': kwargs.get('end_date', None)
}, location_type='storage')
product_move, = [m for m in return_moves
if m.product.id == self.product.id]
return_moves.remove(product_move)
return_locs = kwargs.get('return_locs', {})
for move in return_moves:
loc = return_locs.get(move.product.id, None)
if not loc:
loc = self._get_return_location(move.product)
if not loc:
self.raise_user_error('missing_return_location',
move.product.rec_name, self.rec_name)
move.to_location = loc
return new_moves, return_moves
def _get_return_location(self, product):
if self.warehouse:
return self.warehouse.storage_location
return None
def get_drop_moves(self, name=None):
if self.production_state != 'done':
return []
if not self.moves:
return []
if len(self.moves) == len(self.production_moves):
return []
production_ids = map(int, self.production_moves)
return [m.id for m in self.moves if m.to_location.type in (
'production', 'drop') and m.id not in production_ids]
def get_return_moves(self, name=None):
if self.production_state != 'done':
return []
if len(self.moves) == len(self.production_moves):
return []
if not self.last_moves or not self.drop_moves:
return []
at_date = max(m.end_date for m in self.last_moves)
production_ids = map(int, self.production_moves)
return [m.id for m in self.moves if m.from_location.type in (
'production', 'drop') and m.to_location.type == 'storage' and
m.id not in production_ids and m.start_date == at_date] or []
def get_shipment_out(self, name=None):
moves = [m for m in self.moves
if m.shipment and m.shipment.__name__ == 'stock.shipment.out' and
m.state not in ('draft', 'cancel')]
return moves[0].shipment.id if moves else None
@classmethod
@ModelView.button_action('stock_unit_load.wizard_drop_unit_load')
def drop_wizard(cls, records):
pass
@classmethod
def drop_try(cls, records, to_location, start_date, **kwargs):
# TODO: could be optimized (drop method call 'check_to_move' again)
for record in records:
for move in reversed(record.ul_moves):
if move.to_location.type == 'storage':
from_location = move.to_location
break
if not from_location:
from_location = record.location
record.check_to_move(from_location, to_location, start_date)
try:
cls.drop(records, to_location, start_date, extra_params=kwargs)
return True
except UserError:
return False
@classmethod
def export_data(cls, records, fields_names):
pool = Pool()
Company = pool.get('company.company')
res = super(UnitLoad, cls).export_data(records, fields_names)
company = Company(Transaction().context['company'])
lzone = dateutil.tz.gettz(company.timezone) \
if company.timezone else dateutil.tz.tzutc()
szone = dateutil.tz.tzutc()
for item in res:
for _id_field, _field in enumerate(item):
if isinstance(_field, datetime.datetime):
item[_id_field] = _field.replace(
tzinfo=szone).astimezone(lzone).replace(tzinfo=None)
return res
@classmethod
def _get_production_type_fields(cls):
return {'location': 'production_location'}
class UnitLoadMove(ModelSQL, ModelView):
"""Unit load movement"""
__name__ = 'stock.unit_load.move'
unit_load = fields.Many2One('stock.unit_load', 'UL')
start_date = fields.DateTime('Start date')
end_date = fields.DateTime('End date')
state = fields.Selection([
('staging', 'Staging'),
('draft', 'Draft'),
('assigned', 'Assigned'),
('done', 'Done'),
('cancel', 'Canceled')], 'State')
from_location = fields.Many2One('stock.location', 'From location')
to_location = fields.Many2One('stock.location', 'To location')
quantity = fields.Float('Quantity', digits=(16, Eval('uom_digits', 2)),
depends=['uom_digits'])
uom = fields.Function(
fields.Many2One('product.uom', 'UOM'), 'get_uom')
uom_digits = fields.Function(
fields.Integer('UOM digits'), 'get_uom_digits')
@classmethod
def __setup__(cls):
super(UnitLoadMove, cls).__setup__()
@classmethod
def table_query(cls):
pool = Pool()
Move = pool.get('stock.move')
UL = pool.get('stock.unit_load')
move = Move.__table__()
ul = UL.__table__()
date_column = (Coalesce(move.effective_date,
move.planned_date) + move.time_)
if backend.name() == 'sqlite':
date_column = Concat(Coalesce(move.effective_date,
move.planned_date), Concat(' ', move.time_))
return move.join(ul, condition=(
(move.product == ul.product) & (move.unit_load == ul.id))
).select(
Max(move.id).as_('id'),
move.unit_load,
move.state,
move.from_location,
move.to_location,
Literal(0).as_('create_uid'),
Max(move.create_date).as_('create_date'),
Literal(None).as_('write_uid'),
Literal(None).as_('write_date'),
date_column.as_('start_date'),
move.end_date,
Sum(move.internal_quantity).as_('quantity'),
where=(Coalesce(move.effective_date, move.planned_date) != Null),
group_by=(date_column, move.end_date, move.unit_load, move.state,
move.from_location, move.to_location))
@classmethod
def get_uom(cls, records, name=None):
return {r.id: r.unit_load.uom.id for r in records}
@classmethod
def get_uom_digits(cls, records, name=None):
return {r.id: r.unit_load.uom.digits for r in records}
@classmethod
def read(cls, ids, fields_names=None):
res = super(UnitLoadMove, cls).read(
ids, fields_names=fields_names)
# Convert str start_date to datetime in sqlite
for values in res:
if 'start_date' in values:
if isinstance(values['start_date'], basestring):
values['start_date'] = datetime.datetime(
*time.strptime(values['start_date'],
'%Y-%m-%d %H:%M:%S')[:6])
values['start_date'] = values['start_date']
return res
class MoveUnitLoadStart(ModelView):
"""Start moving unit load"""
__name__ = 'stock.unit_load.do_move_start'
planned_date = fields.DateTime('Planned date', required=True)
location = fields.Many2One('stock.location', 'Location', required=True,
domain=[('type', '=', 'storage')])
@staticmethod
def default_planned_date():
return datetime.datetime.now()
class MoveUnitLoad(Wizard):
"""Move Unit load"""
__name__ = 'stock.unit_load.do_move'
start = StateView('stock.unit_load.do_move_start',
'stock_unit_load.unit_load_move_start_view_form',
[Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'move_', 'tryton-ok', default=True)])
move_ = StateTransition()
def transition_move_(self):
pool = Pool()
Ul = pool.get('stock.unit_load')
uls = Ul.search([
('id', 'in', Transaction().context.get('active_ids'))])
Ul.move(uls, to_location=self.start.location,
at_date=self.start.planned_date)
return 'end'
class DropUnitLoadStart(ModelView):
"""Start dropping unit load"""
__name__ = 'stock.unit_load.do_drop_start'
start_date = fields.DateTime('Start date', required=True)
end_date = fields.DateTime('End date', required=True)
location = fields.Many2One('stock.location', 'Location', required=True,
domain=[('type', 'in', ('production', 'drop'))])
@staticmethod
def default_start_date():
return datetime.datetime.now()
@staticmethod
def default_end_date():
return datetime.datetime.now()
class DropUnitLoadFailedProduct(ModelView):
"""Drop unit load product data"""
__name__ = 'stock.unit_load.do_drop_failed.product'
product = fields.Many2One('product.product', 'Product', required=True)
location = fields.Many2One('stock.location', 'Return location',
required=True, domain=[('type', '=', 'storage')])
class DropUnitLoadFailed(ModelView):
"""Drop unit load data"""
__name__ = 'stock.unit_load.do_drop_failed'
location = fields.Many2One('stock.location', 'Return location',
domain=[('type', '=', 'storage')])
products = fields.One2Many('stock.unit_load.do_drop_failed.product',
None, 'Products')
@fields.depends('location', 'products')
def on_change_location(self):
if not self.location:
return
products = list(self.products)
for product in products:
product.location = self.location
self.products = products
class DropUnitLoad(Wizard):
"""Drop Unit load"""
__name__ = 'stock.unit_load.do_drop'
start = StateView('stock.unit_load.do_drop_start',
'stock_unit_load.unit_load_drop_start_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Set return locations', 'failed', 'tryton-go-next'),
Button('OK', 'try_', 'tryton-ok', default=True)])
try_ = StateTransition()
failed = StateView('stock.unit_load.do_drop_failed',
'stock_unit_load.unit_load_drop_failed_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'force', 'tryton-ok', default=True)])
force = StateTransition()
def transition_try_(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
if Unitload.drop_try([Unitload(Transaction().context['active_id'])],
to_location=self.start.location,
start_date=self.start.start_date,
end_date=self.start.end_date):
return 'end'
else:
return 'failed'
def default_failed(self, fields):
pool = Pool()
Unitload = pool.get('stock.unit_load')
ul = Unitload(Transaction().context['active_id'])
res = {'products': []}
for move in ul.last_moves:
if move.product.id != ul.product.id:
res['products'].append({'product': move.product.id})
return res
def transition_force(self):
pool = Pool()
Ul = pool.get('stock.unit_load')
ul = Ul(Transaction().context.get('active_id'))
return_locs = {p.product.id: p.location.id
for p in self.failed.products}
Ul.drop([ul], to_location=self.start.location,
start_date=self.start.start_date,
extra_params={
'end_date': self.start.end_date,
'return_locs': return_locs})
return 'end'
# TODO: implement odt report
class UnitLoadLabel(CompanyReport):
"""Unit load label"""
__name__ = 'stock.unit_load.label'
@classmethod
def get_context(cls, records, data):
report_context = super(UnitLoadLabel, cls).get_context(records, data)
report_context['extra_info'] = lambda unit_load, language: \
cls.extra_info(unit_load, language)
report_context['product_name'] = lambda ul_id, language: \
cls.product_name(ul_id, language)
report_context['counter'] = lambda unit_load: cls.counter(unit_load)
report_context['origin'] = lambda unit_load, language: \
cls.origin(unit_load, language)
report_context['format_datetime'] = lambda value, company: \
cls.format_datetime(value, company)
return report_context
@classmethod
def extra_info(cls, unit_load, language):
"""Returns a list of tuples with extra info for UL label"""
with Transaction().set_context(language=language):
if unit_load.warehouse and unit_load.production_location:
return [(unit_load.production_location.rec_name, '')]
return []
@classmethod
def product_name(cls, ul_id, language):
UnitLoad = Pool().get('stock.unit_load')
with Transaction().set_context(language=language):
return UnitLoad(ul_id).product.rec_name
@classmethod
def counter(cls, unit_load):
_items = cls.get_counter_data(unit_load)
if _items:
return '%s / %s' % (_items.index(unit_load.id) + 1, len(_items))
return ''
@classmethod
def get_counter_data(cls, unit_load):
return []
@classmethod
def origin(cls, unit_load, language):
with Transaction().set_context(language=language):
return [(cls.get_translation('stock.move', 'origin'),
unit_load.company.party.addresses[0].country.code
if unit_load.company.party.addresses[0].country else '')]
@staticmethod
def get_translation(model, field):
_Model = Pool().get(model)
res = _Model.fields_get(fields_names=[field])
return res[field]['string']
@classmethod
def format_datetime(cls, value, company):
lzone = (dateutil.tz.gettz(company.timezone) if company.timezone
else dateutil.tz.tzutc())
szone = dateutil.tz.tzutc()
return value.replace(tzinfo=szone).astimezone(
lzone).replace(tzinfo=None)
class BatchDropUnitLoadData(ModelView):
"""Batch dropping UL data"""
__name__ = 'stock.unit_load.batch_drop.data'
location = fields.Many2One('stock.location', 'Location', required=True,
domain=[('type', 'in', ['production', 'drop'])])
start_date = fields.DateTime('Start date', required=True)
end_date = fields.DateTime('End date', required=True,
domain=[('end_date', '>=', Eval('start_date'))],
depends=['start_date'])
delay_ = fields.TimeDelta('Delay')
unit_loads = fields.One2Many('stock.unit_load', 'None', 'Unit loads',
required=True,
domain=[('available', '=', True)])
@fields.depends('unit_loads', 'start_date', 'end_date')
def on_change_with_delay_(self):
if self.start_date and self.end_date and self.unit_loads:
_delta = self.end_date - self.start_date
return _delta / len(self.unit_loads)
@fields.depends('delay_', 'start_date', 'unit_loads')
def on_change_delay_(self):
if self.delay_ and self.start_date and self.unit_loads:
self.end_date = self.start_date + (
self.delay_ * len(self.unit_loads))
class BatchDropUnitLoadConfirm(ModelView):
"""Batch dropping UL confirm"""
__name__ = 'stock.unit_load.batch_drop.confirm'
unit_loads = fields.One2Many('stock.unit_load', 'None', 'Unit loads',
readonly=True)
class BatchDropUnitLoad(Wizard):
"""Massive UL dropping"""
__name__ = 'stock.unit_load.batch_drop'
start = StateTransition()
data = StateView('stock.unit_load.batch_drop.data',
'stock_unit_load.unit_load_batch_drop_data_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'confirm', 'tryton-ok', default=True)])
confirm = StateView('stock.unit_load.batch_drop.confirm',
'stock_unit_load.unit_load_batch_drop_confirm_view_form', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('Confirm', 'do_', 'tryton-ok', default=True,
states={'invisible': ~Eval('groups', []).contains(
Id('stock_unit_load', 'group_unit_load_batch_drop'))})])
do_ = StateTransition()
def transition_start(self):
return 'data'
def default_data(self, fields):
if Transaction().context.get('active_ids'):
return {'unit_loads': Transaction().context['active_ids']}
return {}
def default_confirm(self, fields):
return {'unit_loads': map(int, self.data.unit_loads)}
def transition_do_(self):
pool = Pool()
Unitload = pool.get('stock.unit_load')
if not self.data.unit_loads:
return 'end'
uls = Unitload.browse(map(int, self.confirm.unit_loads))
Unitload.drop(uls, self.data.location,
self.data.start_date, extra_params={
'delay': self.data.delay_,
'done_moves': True})
return 'end'