Implement internal shipment integration.

This commit refs #1382
This commit is contained in:
Sergio Morillo 2016-09-06 13:17:43 +02:00
parent c2a19ded22
commit 0c9c43ca20
5 changed files with 488 additions and 46 deletions

103
load.py
View File

@ -127,34 +127,46 @@ class LoadOrder:
res.ul_quantity = len(grouped_items)
return res
def _get_shipment_sale(self, sale):
res = super(LoadOrder, self)._get_shipment_sale(sale)
def _get_shipment_out(self, sale):
res = super(LoadOrder, self)._get_shipment_out(sale)
res.start_date = self.start_date
res.on_change_start_date()
res.end_date = self.end_date
return res
def _get_shipment_moves(self, sale_line, grouped_items):
def _get_shipment_moves(self, origin, grouped_items):
pool = Pool()
Move = pool.get('stock.move')
moves = []
other_moves = []
if self.type == 'out':
from_location = origin.from_location.id
to_location = origin.to_location.id
elif self.type == 'internal':
to_location = self.to_location.id
for item in grouped_items:
new_moves = item._get_new_moves({'from_location': sale_line.from_location.id,
'to_location': sale_line.to_location.id,
'start_date': self.end_date,
'end_date': self.end_date,
'state': 'draft'})
move, = [m for m in new_moves if m.product == item.product]
move.origin = sale_line
moves.append(move)
if self.type == 'internal':
from_location = item.location.id
new_moves = item._get_new_moves({
'from_location': from_location,
'to_location': to_location,
'start_date': self.end_date,
'end_date': self.end_date,
'state': 'draft'})
for new_move in new_moves:
if new_move.product.id == item.product.id:
continue
new_move.origin = item.load_line
other_moves.append(new_move)
if self.type == 'out':
move, = [m for m in new_moves if m.product == item.product]
move.origin = origin
moves.append(move)
for new_move in new_moves:
if new_move.product.id == item.product.id:
continue
new_move.origin = item.load_line
other_moves.append(new_move)
elif self.type == 'internal':
moves.extend(new_moves)
if other_moves:
Move.save(other_moves)
@ -169,14 +181,18 @@ class LoadOrder:
all(ul not in self.unit_loads for ul in uls))
_add = uls[0] not in self.unit_loads
if not self.sale:
if not self.shipment:
return
if self.sale.state not in ('draft', 'quotation'):
self.raise_user_error('sale_confirmed', self.sale.rec_name)
if self.type == 'out':
if not self.sale:
return
if self.sale.state not in ('draft', 'quotation'):
self.raise_user_error('sale_confirmed', self.sale.rec_name)
if not _add and len(uls) == len(self.unit_loads):
self.raise_user_error('unload_cancel', self.rec_name)
keyfunc = partial(self._group_sale_line_key, uls)
keyfunc = partial(self._group_line_key, uls)
items = sorted(uls, key=keyfunc)
for key, grouped_items in groupby(items, key=keyfunc):
_groupitems = list(grouped_items)
@ -192,32 +208,37 @@ class LoadOrder:
line_values.append(value)
return line_values
sale_line = [l for l in self.sale.lines
if get_line_values(l) == key_dict.values()]
sale_line = None
if self.sale:
sale_line = [l for l in self.sale.lines
if get_line_values(l) == key_dict.values()]
if not sale_line:
if not _add:
continue
sale_line = self._get_load_sale_line(key, _groupitems)
sale_line.sale = self.sale
else:
sale_line, = sale_line
self._update_sale_line(sale_line, _groupitems, _add)
sale_line.save()
if self.type == 'out':
if not sale_line:
if not _add:
continue
sale_line = self._get_load_sale_line(key, _groupitems)
sale_line.sale = self.sale
else:
sale_line, = sale_line
self._update_sale_line(sale_line, _groupitems, _add)
sale_line.save()
shipment = sale_line.sale.shipments[0]
shipment = self.shipment
if _add:
outgoing_moves = self._get_shipment_moves(sale_line, _groupitems)
inventory_moves = []
for move in outgoing_moves:
move.shipment = shipment
_inventory = shipment._get_inventory_move(move)
_inventory.start_date = self.start_date
inventory_moves.append(_inventory)
if self.type == 'out':
_inventory = shipment._get_inventory_move(move)
_inventory.start_date = self.start_date
inventory_moves.append(_inventory)
Move.save(outgoing_moves + inventory_moves)
Move.assign(outgoing_moves)
Move.do(inventory_moves)
if self.type == 'out':
Move.do(inventory_moves)
else:
shipment_moves = [m for m in shipment.moves if m.unit_load in _groupitems]
load_moves = [m for m in self.inventory_moves if m.unit_load in _groupitems]
@ -225,7 +246,7 @@ class LoadOrder:
with Transaction().set_context(check_origin=False):
Move.cancel(shipment_moves + load_moves)
Move.delete(shipment_moves + load_moves)
if not sale_line.quantity:
if sale_line and not sale_line.quantity:
Saleline.delete([sale_line])
to_do = []
@ -331,13 +352,7 @@ class LoadOrder:
self.raise_user_error('ul_location', unit_load.rec_name)
# check it is in warehouse
wh = None
while not wh:
location = location.parent
if not location:
break
if location.type == 'warehouse':
wh = location
wh = location.warehouse
if not wh or wh.id != self.load.warehouse.id:
self.raise_user_error('ul_warehouse', (
unit_load.rec_name, self.load.warehouse.rec_name,

View File

@ -390,6 +390,8 @@ Check sale::
u'Plastic Case 30x30'
>>> order.outgoing_moves[0].quantity
2.0
>>> order.shipment == shipment
True
Force load another UL::

View File

@ -0,0 +1,414 @@
================
Carrier load UL
================
Imports::
>>> import datetime
>>> from dateutil.relativedelta import relativedelta
>>> from decimal import Decimal
>>> from trytond.modules.company.tests.tools import create_company, \
... get_company
>>> from trytond.modules.carrier_vehicle.tests.tools import create_carrier
>>> from trytond.modules.account.tests.tools import create_fiscalyear, \
... create_chart, get_accounts, create_tax
>>> from.trytond.modules.account_invoice.tests.tools import \
... set_fiscalyear_invoice_sequences, create_payment_term
>>> from trytond.modules.stock_unit_load.tests.tools import create_unit_load
>>> from proteus import config, Model, Wizard, Report
>>> today = datetime.date.today()
Create database::
>>> config = config.set_trytond()
>>> config.pool.test = True
Install agro Module::
>>> Module = Model.get('ir.module')
>>> module, = Module.find([('name', '=', 'carrier_load_ul')])
>>> module.click('install')
>>> Wizard('ir.module.install_upgrade').execute('upgrade')
Create company::
>>> _ = create_company()
>>> company = get_company()
Reload the context::
>>> User = Model.get('res.user')
>>> Group = Model.get('res.group')
>>> config._context = User.get_preferences(True, config.context)
Create sale user::
>>> sale_user = User()
>>> sale_user.name = 'Sale'
>>> sale_user.login = 'sale'
>>> sale_user.main_company = company
>>> sale_group, = Group.find([('name', '=', 'Sales')])
>>> sale_user.groups.append(sale_group)
>>> sale_user.save()
Create stock user::
>>> stock_user = User()
>>> stock_user.name = 'Stock'
>>> stock_user.login = 'stock'
>>> stock_user.main_company = company
>>> stock_group, = Group.find([('name', '=', 'Stock')])
>>> stock_user.groups.append(stock_group)
>>> stock_user.save()
Create account user::
>>> account_user = User()
>>> account_user.name = 'Account'
>>> account_user.login = 'account'
>>> account_user.main_company = company
>>> account_group, = Group.find([('name', '=', 'Account')])
>>> account_user.groups.append(account_group)
>>> account_user.save()
Create fiscal year::
>>> fiscalyear = set_fiscalyear_invoice_sequences(
... create_fiscalyear(company))
>>> fiscalyear.click('create_period')
Create chart of accounts::
>>> _ = create_chart(company)
>>> accounts = get_accounts(company)
>>> revenue = accounts['revenue']
>>> expense = accounts['expense']
>>> cash = accounts['cash']
>>> Journal = Model.get('account.journal')
>>> cash_journal, = Journal.find([('type', '=', 'cash')])
>>> cash_journal.credit_account = cash
>>> cash_journal.debit_account = cash
>>> cash_journal.save()
Create tax::
>>> tax = create_tax(Decimal('.10'))
>>> tax.save()
Create parties::
>>> Party = Model.get('party.party')
>>> customer = Party(name='Customer')
>>> customer.save()
Create payment term::
>>> payment_term = create_payment_term()
>>> payment_term.save()
Create carrier::
>>> carrier = create_carrier(config)
>>> carrier_product = carrier.carrier_product.template
>>> carrier_product.purchasable = True
>>> carrier_product.purchase_uom = carrier_product.default_uom
>>> carrier_product.account_expense = expense
>>> carrier_product.save()
Get warehouse and dock::
>>> Location = Model.get('stock.location')
>>> wh, = Location.find([('type', '=', 'warehouse')])
>>> dock = wh.docks.new()
>>> dock.name = 'Dock 1'
>>> dock.code = 'D1'
>>> wh.save()
>>> wh2, = wh.duplicate()
>>> wh2.name = 'Warehouse 2'
>>> wh2.code = 'WH2'
>>> Dock = Model.get('stock.location.dock')
>>> d2 = wh2.docks.new()
>>> d2.name = 'Dock WH2'
>>> wh2.save()
Create carrier load::
>>> Load = Model.get('carrier.load')
>>> load = Load()
>>> load.company != None
True
>>> load.state
'draft'
>>> load.date == today
True
>>> load.warehouse = wh
>>> load.warehouse_output == load.warehouse.output_location
True
>>> load.dock != None
True
>>> load.carrier = carrier
>>> load.vehicle != None
True
>>> load.save()
>>> load.code != None
True
Create load order::
>>> Order = Model.get('carrier.load.order')
>>> order = Order(load=load)
>>> order.type = 'internal'
>>> order.start_date = datetime.datetime.now() - relativedelta(minutes=20)
>>> order.state
'draft'
>>> order.ul_origin_restrict
True
>>> order.to_location = wh2.storage_location
>>> line = order.lines.new()
>>> line.ul_quantity = Decimal(1)
>>> order.save()
>>> order.code != None
True
>>> order.date == load.date
True
>>> order.click('wait')
>>> order.state
u'waiting'
Create unit load::
>>> Unitload = Model.get('stock.unit_load')
>>> ul = create_unit_load(config=config, do_state=False,
... default_values={'start_date': datetime.datetime.now() + relativedelta(days=-1)})
>>> template = ul.product.template
>>> template.salable = True
>>> template.account_expense = expense
>>> template.account_revenue = revenue
>>> template.save()
>>> main_product = ul.product
Add other products to unit load::
>>> ProductUom = Model.get('product.uom')
>>> unit, = ProductUom.find([('name', '=', 'Unit')])
>>> ProductTemplate = Model.get('product.template')
>>> Product = Model.get('product.product')
>>> ProductCategory = Model.get('product.category')
>>> category = ProductCategory(name='Category')
>>> category.save()
>>> product = Product()
>>> template = ProductTemplate()
>>> template.name = 'Plastic Case 30x30'
>>> template.categories.append(category)
>>> template.default_uom = unit
>>> template.type = 'goods'
>>> template.salable = True
>>> template.list_price = Decimal('10')
>>> template.cost_price = Decimal('5')
>>> template.account_expense = expense
>>> template.account_revenue = revenue
>>> template.customer_taxes.append(tax)
>>> template.save()
>>> product.template = template
>>> product.save()
>>> move = ul.moves.new()
>>> move.planned_date = today
>>> move.product = product
>>> move.quantity = Decimal(2)
>>> move.from_location = ul.moves[0].from_location
>>> move.to_location = ul.moves[0].to_location
>>> move.currency = move.company.currency
>>> ul.save()
Starting load wizard::
>>> start_load = Wizard('carrier.load_uls', [])
>>> start_load.form.load_order = order
>>> start_load.execute('data')
>>> start_load.form.load_order == order
True
>>> len(start_load.form.uls_loaded)
0
>>> start_load.form.loaded_uls
0
>>> start_load.form.ul_code = 'X'
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserError: ('UserError', ('Cannot find Unit load "X".', ''))
Check UL loading restrictions::
>>> start_load.form.ul_code = ul.code
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserError: ('UserError', (u'UL "1" must be in Done state.', ''))
>>> ul.click('assign')
>>> ul.click('do')
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserError: ('UserError', (u'UL "1" does not belong to any origin of this load order.', ''))
>>> order.ul_origin_restrict = False
>>> order.save()
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserWarning: ('UserWarning', ('loading_ul_origin_1', u'UL "1" does not belong to any origin of this load order.', ''))
>>> Model.get('res.user.warning')(user=config.user,
... name='loading_ul_origin_1', always=True).save()
>>> start_load.execute('load_')
>>> start_load.form.loaded_uls
1
>>> order.reload()
>>> order.state
u'running'
>>> order.start_date != None
True
>>> len(order.unit_loads)
1
>>> start_load.form.ul_code = ul.code
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserError: ('UserError', (u'UL "1" is already loaded.', ''))
Add an invalid UL::
>>> Location = Model.get('stock.location')
>>> lost_found, = Location.find([('type', '=', 'lost_found')], limit=1)
>>> ul2 = create_unit_load(do_state=False,
... default_values={'start_date': datetime.datetime.now() + relativedelta(days=-1)})
>>> ul2.save()
>>> ul2.click('do')
>>> move_ul = Wizard('stock.unit_load.do_move', [ul2])
>>> move_ul.form.location = lost_found
>>> move_ul.form.date = ul2.end_date + relativedelta(minutes=30)
>>> move_ul.execute('move_')
>>> ul2.click('do')
>>> start_load.form.ul_code = ul2.code
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserError: ('UserError', (u'UL "2" must be in a storage location.', ''))
>>> move_ul = Wizard('stock.unit_load.do_move', [ul2])
>>> move_ul.form.location = ul.location
>>> move_ul.form.date = ul2.end_date + relativedelta(minutes=45)
>>> move_ul.execute('move_')
>>> ul2.reload()
>>> ul2.click('do')
>>> Model.get('res.user.warning')(user=config.user,
... name='loading_ul_origin_2', always=True).save()
>>> start_load.execute('load_') # doctest:
Traceback (most recent call last):
...
UserError: ('UserError', (u'All valid lines of load order "1" are complete. Cannot load more ULs.', ''))
Unload UL::
>>> order.lines[0].ul_quantity += 1
>>> order.save()
>>> start_load.execute('load_')
>>> start_load.form.loaded_uls
2
>>> start_load.form.uls_loaded.append(ul2)
>>> start_load.execute('unload_')
>>> start_load.form.loaded_uls
1
>>> ul2.load_line == None
True
>>> start_load.execute('end')
>>> order.reload()
>>> len(order.unit_loads)
1
>>> start_load = Wizard('carrier.load_uls', [order])
>>> start_load.form.ul_code = ul2.code
>>> start_load.execute('load_')
>>> start_load.execute('end')
>>> order.reload()
>>> len(order.unit_loads)
2
>>> ul2.click('unload')
>>> order.reload()
>>> len(order.unit_loads)
1
Finish loading::
>>> start_load = Wizard('carrier.load_uls', [order])
>>> start_load.execute('do_') # doctest:
Traceback (most recent call last):
...
UserWarning: ('UserWarning', ('pending_uls_1', 'You have loaded less ULs (1) than expected (2).', ''))
>>> Model.get('res.user.warning')(user=config.user,
... name='pending_uls_1', always=True).save()
>>> start_load.execute('do_')
>>> order.reload()
>>> len(order.unit_loads)
1
>>> order.state
u'done'
Check internal shipment::
>>> shipment = order.shipment
>>> not shipment
False
>>> shipment.state
u'assigned'
>>> shipment.planned_date == order.end_date.date()
True
>>> shipment.effective_date == order.end_date.date()
True
>>> len(shipment.moves)
2
>>> set(m.state for m in shipment.moves)
set([u'assigned'])
>>> shipment.moves[0].unit_load.id == ul.id
True
>>> not order.inventory_moves
True
>>> not order.outgoing_moves
True
Force load another UL::
>>> Unitload = Model.get('stock.unit_load')
>>> ul = create_unit_load(config=config, do_state=False,
... default_values={'start_date': datetime.datetime.now() + relativedelta(days=-1)})
>>> ul.product = main_product
>>> ul.save()
>>> move = ul.moves.new()
>>> move.planned_date = today
>>> move.product = product
>>> move.quantity = Decimal(2)
>>> move.from_location = ul.moves[0].from_location
>>> move.to_location = ul.moves[0].to_location
>>> move.currency = move.company.currency
>>> ul.save()
>>> ul.click('assign')
>>> ul.click('do')
>>> Model.get('res.user.warning')(user=config.user,
... name='loading_ul_origin_3', always=True).save()
>>> start_load = Wizard('carrier.load_uls', [order])
>>> start_load.form.ul_code = ul.code
>>> start_load.execute('load_')
>>> start_load.execute('end')
>>> shipment.reload()
>>> len(shipment.unit_loads)
2
>>> ul in shipment.unit_loads
True
>>> len(shipment.moves)
4
>>> set(m.state for m in shipment.moves)
set([u'assigned'])
>>> order.reload()
>>> not order.inventory_moves
True
>>> not order.outgoing_moves
True

View File

@ -26,4 +26,8 @@ def suite():
'scenario_force_unload.rst',
setUp=doctest_setup, tearDown=doctest_teardown, encoding='utf-8',
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE))
suite.addTests(doctest.DocFileSuite(
'scenario_load_ul_internal.rst',
setUp=doctest_setup, tearDown=doctest_teardown, encoding='utf-8',
optionflags=doctest.REPORT_ONLY_FIRST_FAILURE))
return suite

View File

@ -72,12 +72,19 @@ class UnitLoad:
@classmethod
def get_available(cls, records, name=None):
res = super(UnitLoad, cls).get_available(records, name)
sub_records = [r for r in records if r.load_line]
sub_records = [r for r in records
if r.load_line and r.load_line.order.type == 'out']
res.update({r.id: False for r in sub_records})
return res
@classmethod
def search_available(cls, name, clause):
reverse = {
'=': '!=',
'!=': '='}
res = super(UnitLoad, cls).search_available(name, clause)
res.append(('load_line', clause[1], None))
if clause[2]:
res.append(('load_line.order.type', reverse[clause[1]], 'out'))
else:
res.append(('load_line.order.type', clause[1], 'out'))
return res