1458 lines
52 KiB
Python
1458 lines
52 KiB
Python
# The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
from functools import partial
|
|
from itertools import groupby
|
|
from decimal import Decimal
|
|
from sql import Null
|
|
from sql.operators import Concat
|
|
from trytond.model import fields, ModelView, Model, ModelSQL, Unique, Workflow
|
|
from trytond.pool import PoolMeta, Pool
|
|
from trytond.pyson import Eval, Bool, Id, If, And
|
|
from trytond.tools import reduce_ids
|
|
from trytond.transaction import Transaction
|
|
from trytond.wizard import Wizard, StateTransition, StateView, Button, \
|
|
StateAction
|
|
from trytond.exceptions import UserError, UserWarning
|
|
from trytond.i18n import gettext
|
|
from .exceptions import (AddUnitLoadError, AddUnitLoadWarning,
|
|
AddUnitLoadOverloadError, AddUnitLoadOriginError, AddUnitLoadOriginWarning)
|
|
from trytond.modules.stock_unit_load import cases_digits
|
|
try:
|
|
import phonenumbers
|
|
from phonenumbers import PhoneNumberFormat, NumberParseException
|
|
except ImportError:
|
|
phonenumbers = None
|
|
|
|
|
|
class Configuration(metaclass=PoolMeta):
|
|
__name__ = 'carrier.configuration'
|
|
|
|
ul_origin_restrict = fields.Boolean('Restrict UL origin',
|
|
help='Restricts origin of UL when loading in a Load order.')
|
|
quantity_check = fields.MultiSelection([
|
|
('uls', 'ULs'),
|
|
('cases', 'Cases'),
|
|
], "Quantity Check", sort=False)
|
|
|
|
@classmethod
|
|
def default_ul_origin_restrict(cls):
|
|
return True
|
|
|
|
@classmethod
|
|
def default_quantity_check(cls):
|
|
return ['uls']
|
|
|
|
|
|
class Load(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load'
|
|
|
|
unit_loads = fields.Function(
|
|
fields.One2Many('stock.unit_load', None, 'Unit loads'),
|
|
'get_unit_loads', searcher='search_unit_loads')
|
|
|
|
def get_unit_loads(self, name=None):
|
|
if not self.orders:
|
|
return []
|
|
return [ul.id for l in self.orders for ul in l.unit_loads
|
|
if l.unit_loads]
|
|
|
|
@classmethod
|
|
def search_unit_loads(cls, name, clause):
|
|
return [('orders.unit_loads', ) + tuple(clause[1:])]
|
|
|
|
def add_ul(self, unit_loads, origin_restrict=None,
|
|
origin_restrict_warn=True, force=False):
|
|
_error = None
|
|
for order in self.orders:
|
|
try:
|
|
# TODO: managed failed ULs
|
|
order.add_ul(unit_loads,
|
|
origin_restrict=origin_restrict,
|
|
origin_restrict_warn=origin_restrict_warn,
|
|
force=force)
|
|
except (AddUnitLoadError, AddUnitLoadWarning) as e:
|
|
_error = e
|
|
else:
|
|
# if works finish method
|
|
return order
|
|
if _error:
|
|
raise _error
|
|
|
|
@property
|
|
def ul_quantity(self):
|
|
return sum(o.ul_quantity for o in self.orders)
|
|
|
|
|
|
class LoadOrder(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.order'
|
|
|
|
unit_loads = fields.Function(
|
|
fields.One2Many('stock.unit_load', None, 'Unit loads',
|
|
states={'readonly': Eval('state').in_(['cancelled', 'done'])},
|
|
depends=['state']),
|
|
'get_unit_loads', setter='set_unit_loads',
|
|
searcher='search_unit_loads')
|
|
ul_origin_restrict = fields.Boolean('Restrict UL origin',
|
|
states={'readonly': Eval('state').in_(['cancelled', 'done'])},
|
|
depends=['state'])
|
|
ul_quantity = fields.Function(fields.Float('ULs', digits=(16, 0)),
|
|
'get_ul_quantity')
|
|
loaded_uls = fields.Function(fields.Float('Loaded ULs', digits=(16, 0)),
|
|
'get_loaded_uls')
|
|
loaded_cases = fields.Function(
|
|
fields.Float('Loaded Cases', digits=cases_digits),
|
|
'get_loaded_cases')
|
|
cases_quantity = fields.Function(
|
|
fields.Float('Cases', digits=cases_digits),
|
|
'get_cases_quantity')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(LoadOrder, cls).__setup__()
|
|
cls._buttons.update({
|
|
'run_try': {
|
|
'icon': 'tryton-forward',
|
|
'invisible': ~Eval('state').in_(['waiting', 'running']),
|
|
'depends': ['state']
|
|
},
|
|
})
|
|
|
|
@classmethod
|
|
def __register__(cls, module_name):
|
|
pool = Pool()
|
|
Sale = pool.get('sale.sale')
|
|
sale = Sale.__table__()
|
|
sql_table = cls.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
|
|
super(LoadOrder, cls).__register__(module_name)
|
|
|
|
# Migration from 4.0: set sale
|
|
cursor.execute(*sql_table.join(sale,
|
|
condition=Concat(
|
|
cls.__name__ + ',', sql_table.id) == sale.origin
|
|
).select(sql_table.id, sale.id,
|
|
where=(sql_table.sale == Null) &
|
|
(sql_table.type == 'out') &
|
|
(sql_table.state == 'done'),
|
|
group_by=[sql_table.id, sale.id])
|
|
)
|
|
for order_id, sale_id in cursor.fetchall():
|
|
cursor.execute(*sql_table.update([sql_table.sale], [sale_id],
|
|
where=sql_table.id == order_id))
|
|
|
|
@classmethod
|
|
def default_ul_origin_restrict(cls):
|
|
pool = Pool()
|
|
Configuration = pool.get('carrier.configuration')
|
|
|
|
conf = Configuration(1)
|
|
return conf.ul_origin_restrict
|
|
|
|
def get_unit_loads(self, name=None):
|
|
if not self.lines:
|
|
return []
|
|
return [ul.id for line in self.lines for ul in line.get_unit_loads()]
|
|
|
|
def get_loaded_uls(self, name=None):
|
|
if not self.unit_loads:
|
|
return 0
|
|
return len(self.unit_loads)
|
|
|
|
def get_loaded_cases(self, name=None):
|
|
if not self.unit_loads:
|
|
return 0
|
|
return sum(ul.cases_quantity for ul in self.unit_loads)
|
|
|
|
def get_cases_quantity(self, name=None):
|
|
if not self.lines:
|
|
return 0
|
|
return sum(l.cases_quantity for l in self.lines)
|
|
|
|
@classmethod
|
|
def set_unit_loads(cls, records, name, value):
|
|
pass
|
|
|
|
@classmethod
|
|
def search_unit_loads(cls, name, clause):
|
|
return ['OR',
|
|
[
|
|
('lines.unit_loads', ) + tuple(clause[1:]),
|
|
], [
|
|
('state', '=', 'done'),
|
|
('lines.loaded_unit_loads', ) + tuple(clause[1:])
|
|
]
|
|
]
|
|
|
|
def get_carrier_amount(self, name=None):
|
|
if (not self.load.unit_price or not self.ul_quantity or
|
|
not self.load.ul_quantity):
|
|
return 0
|
|
return self.load.currency.round(
|
|
(Decimal(self.ul_quantity) / Decimal(
|
|
self.load.ul_quantity)) * self.load.unit_price)
|
|
|
|
@classmethod
|
|
def cancel(cls, records):
|
|
if any(r.unit_loads for r in records):
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_cancel_ul'))
|
|
super(LoadOrder, cls).cancel(records)
|
|
|
|
def _get_load_sale(self, Sale):
|
|
res = super(LoadOrder, self)._get_load_sale(Sale)
|
|
res.shipment_method = 'manual'
|
|
return res
|
|
|
|
def _get_load_sale_line(self, sale, key, grouped_items):
|
|
res = super(LoadOrder, self)._get_load_sale_line(
|
|
sale, key, grouped_items)
|
|
pool = Pool()
|
|
Modeldata = pool.get('ir.model.data')
|
|
Uom = pool.get('product.uom')
|
|
uom = Uom(Modeldata.get_id('product', 'uom_unit'))
|
|
res.ul_quantity = len(grouped_items)
|
|
res.cases_quantity = uom.round(sum(
|
|
item.cases_quantity for item in grouped_items))
|
|
return res
|
|
|
|
def _get_shipment_out(self, sale):
|
|
res = super(LoadOrder, self)._get_shipment_out(sale)
|
|
res.start_date = max(
|
|
self.start_date,
|
|
max([ul.last_moves[0].end_date for ul in self.unit_loads])
|
|
)
|
|
res.on_change_start_date()
|
|
res.end_date = self.end_date
|
|
return res
|
|
|
|
def _get_shipment_internal(self):
|
|
shipment = super()._get_shipment_internal()
|
|
if self.unit_loads:
|
|
from_locations = set([ul.at_warehouse for ul in self.unit_loads])
|
|
if len(from_locations) > 1:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_many_ul_locations',
|
|
order=self.rec_name))
|
|
warehouse = from_locations.pop()
|
|
shipment.from_location = warehouse.storage_location
|
|
return shipment
|
|
|
|
def _get_shipment_moves(self, origin, grouped_items):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
|
|
moves = []
|
|
other_moves = []
|
|
if self.type == 'out':
|
|
from_location = origin.from_location
|
|
to_location = origin.to_location
|
|
elif self.type == 'internal':
|
|
to_location = self.to_location
|
|
|
|
for item in grouped_items:
|
|
if self.type == 'internal':
|
|
from_location = item.at_warehouse.storage_location
|
|
item.check_to_move(from_location, to_location, self.end_date)
|
|
new_moves = item._get_new_moves({
|
|
'from_location': from_location.id,
|
|
'to_location': to_location.id,
|
|
'start_date': self.end_date,
|
|
'end_date': self.end_date,
|
|
'state': 'draft'})
|
|
|
|
if self.type == 'out':
|
|
move, = [m for m in new_moves if m.product == item.product]
|
|
move.origin = origin
|
|
moves.append(move)
|
|
|
|
for new_move in new_moves:
|
|
if new_move.product.id == item.product.id:
|
|
continue
|
|
out_location = self._get_outgoing_moves_location(new_move)
|
|
new_move.origin = item.load_line
|
|
if out_location and out_location != new_move.to_location:
|
|
new_move.to_location = out_location
|
|
# if location differs from origin it cannot be linked
|
|
# to shipment
|
|
other_moves.append(new_move)
|
|
else:
|
|
# otherwise move will have origin and shipment
|
|
moves.append(new_move)
|
|
elif self.type == 'internal':
|
|
moves.extend(new_moves)
|
|
|
|
if other_moves:
|
|
Move.save(other_moves)
|
|
return moves
|
|
|
|
def _get_outgoing_moves_location(self, move):
|
|
if move and move.to_location and move.to_location.type == 'customer':
|
|
return move.to_location
|
|
if self.party:
|
|
return self.party.customer_location
|
|
|
|
def _update_sale(self, uls):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
Saleline = pool.get('sale.line')
|
|
|
|
assert (all(ul in self.unit_loads for ul in uls) or
|
|
all(ul not in self.unit_loads for ul in uls))
|
|
_add = uls[0] not in self.unit_loads
|
|
|
|
if not self.shipment:
|
|
return
|
|
if self.type == 'out':
|
|
if not self.sale:
|
|
return
|
|
if self.state != 'done':
|
|
return
|
|
elif self.sale.state not in ('draft', 'quotation'):
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_sale_confirmed',
|
|
sale=self.sale.rec_name))
|
|
|
|
keyfunc = partial(self._group_sale_line_key, uls)
|
|
items = sorted(uls, key=keyfunc)
|
|
for key, grouped_items in groupby(items, key=keyfunc):
|
|
_groupitems = list(grouped_items)
|
|
key_dict = dict(key)
|
|
_fields = list(key_dict.keys())
|
|
|
|
def get_line_values(line):
|
|
line_values = []
|
|
for _field in _fields:
|
|
value = getattr(line, _field, None)
|
|
if isinstance(value, Model):
|
|
value = int(value)
|
|
line_values.append(value)
|
|
return line_values
|
|
|
|
sale_line = None
|
|
if self.sale:
|
|
sale_line = [l for l in self.sale.lines
|
|
if get_line_values(l) == list(key_dict.values())]
|
|
|
|
if self.type == 'out':
|
|
if not sale_line:
|
|
if not _add:
|
|
continue
|
|
sale_line = self._get_load_sale_line(self.sale, key,
|
|
_groupitems)
|
|
sale_line.sale = self.sale
|
|
else:
|
|
sale_line, = sale_line
|
|
self._update_sale_line(sale_line, _groupitems, _add)
|
|
sale_line.save()
|
|
|
|
shipment = self.shipment
|
|
|
|
if _add:
|
|
outgoing_moves = self._get_shipment_moves(sale_line,
|
|
_groupitems)
|
|
inventory_moves = []
|
|
for move in outgoing_moves:
|
|
move.shipment = shipment
|
|
if self.type == 'out':
|
|
_inventory = shipment._get_inventory_move(move)
|
|
_inventory.start_date = shipment.start_date
|
|
inventory_moves.append(_inventory)
|
|
Move.save(outgoing_moves + inventory_moves)
|
|
Move.assign(outgoing_moves)
|
|
if self.type == 'out':
|
|
Move.do(inventory_moves)
|
|
if shipment.state == 'done':
|
|
Move.do(outgoing_moves)
|
|
else:
|
|
shipment_moves = [m for m in shipment.moves
|
|
if m.unit_load in _groupitems]
|
|
load_moves = [m for m in self.inventory_moves
|
|
if m.unit_load in _groupitems and not m.shipment]
|
|
load_moves += [m for m in self.outgoing_moves
|
|
if m.unit_load in _groupitems and not m.shipment]
|
|
with Transaction().set_context(check_origin=False,
|
|
check_shipment=False):
|
|
Move.cancel(shipment_moves + load_moves)
|
|
Move.delete(shipment_moves + load_moves)
|
|
if sale_line and not sale_line.quantity:
|
|
Saleline.delete([sale_line])
|
|
|
|
to_do = []
|
|
for move in self.outgoing_moves:
|
|
if move.state == 'draft' and not move.shipment:
|
|
_new_move = self._get_inventory_move(move)
|
|
_new_move.save()
|
|
to_do.extend([move, _new_move])
|
|
if to_do:
|
|
Move.do(to_do)
|
|
|
|
def _update_sale_line(self, sale_line, items, _add=True):
|
|
_sign = 1 if _add else -1
|
|
sale_line.ul_quantity += _sign * len(items)
|
|
sale_line.cases_quantity += _sign * sum(
|
|
item.cases_quantity for item in items)
|
|
sale_line.quantity += _sign * sale_line.unit.round(
|
|
sum(sale_line.unit.compute_qty(item.uom, item.quantity,
|
|
sale_line.unit) for item in items))
|
|
return True
|
|
|
|
def _get_items(self):
|
|
return self.unit_loads
|
|
|
|
def get_ul_quantity(self, name=None):
|
|
ul_quantity = 0
|
|
|
|
for line in self.lines:
|
|
if not line.ul_quantity:
|
|
return None
|
|
ul_quantity += line.ul_quantity
|
|
|
|
return ul_quantity
|
|
|
|
@classmethod
|
|
def run(cls, records):
|
|
to_run = [record for record in records if record.state == 'done']
|
|
if to_run:
|
|
cls._set_loaded_unit_loads(to_run, revert=True)
|
|
|
|
super().run(records)
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('done')
|
|
def do(cls, records):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
|
|
cls._check_loaded_quantity(records)
|
|
super(LoadOrder, cls).do(records)
|
|
cls._set_loaded_unit_loads(records)
|
|
moves = []
|
|
for record in records:
|
|
if record.type != 'out':
|
|
continue
|
|
wh = record.warehouse
|
|
if wh.output_location == wh.storage_location:
|
|
continue
|
|
moves.extend([
|
|
record._get_inventory_move(m) for m in record.outgoing_moves
|
|
if not m.shipment
|
|
])
|
|
if moves:
|
|
Move.save(moves)
|
|
Move.do([m for r in records for line in r.lines for m in line.moves
|
|
if not m.shipment])
|
|
|
|
@classmethod
|
|
def _check_loaded_quantity(cls, records):
|
|
pool = Pool()
|
|
Configuration = pool.get('carrier.configuration')
|
|
ModelData = pool.get('ir.model.data')
|
|
User = pool.get('res.user')
|
|
Group = pool.get('res.group')
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
conf = Configuration(1)
|
|
|
|
def in_group():
|
|
group = Group(ModelData.get_id('carrier_load_ul',
|
|
'group_do_load_pending_quantity'))
|
|
transaction = Transaction()
|
|
user_id = transaction.user
|
|
if user_id == 0:
|
|
user_id = transaction.context.get('user', user_id)
|
|
if user_id == 0:
|
|
return True
|
|
user = User(user_id)
|
|
return group in user.groups
|
|
|
|
for record in records:
|
|
if not record.ul_quantity:
|
|
continue
|
|
if not record.unit_loads:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_no_uls',
|
|
order=record.rec_name))
|
|
if ('uls' in conf.quantity_check
|
|
and record.ul_quantity > record.loaded_uls):
|
|
|
|
message = gettext('carrier_load_ul.'
|
|
'msg_carrier_load_order_pending_uls',
|
|
uls=record.loaded_uls,
|
|
ul_quantity=int(record.ul_quantity))
|
|
|
|
if in_group():
|
|
warning_name = 'pending_uls_%s' % (record.origins
|
|
or record.id)
|
|
if Warning.check(warning_name):
|
|
raise UserWarning(warning_name, message)
|
|
else:
|
|
raise UserError(message)
|
|
|
|
if ('cases' in conf.quantity_check
|
|
and record.cases_quantity > record.loaded_cases):
|
|
message = gettext(
|
|
'carrier_load_ul.'
|
|
'msg_carrier_load_order_pending_cases',
|
|
loaded_cases=record.loaded_cases,
|
|
cases_quantity=record.cases_quantity)
|
|
if in_group():
|
|
warning_name = 'pending_cases_%s' % (record.origins
|
|
or record.id)
|
|
if Warning.check(warning_name):
|
|
raise UserWarning(warning_name, message)
|
|
else:
|
|
raise UserError(message)
|
|
|
|
@classmethod
|
|
def _set_loaded_unit_loads(cls, records, revert=False):
|
|
pool = Pool()
|
|
OrderLine = pool.get('carrier.load.order.line')
|
|
|
|
to_write = []
|
|
changes = {
|
|
'loaded_unit_loads': 'add' if not revert else 'remove',
|
|
'unit_loads': 'remove' if not revert else 'add'
|
|
}
|
|
for fieldname, operation in changes.items():
|
|
if operation == 'remove':
|
|
break
|
|
for record in records:
|
|
for line in record.lines:
|
|
uls = [ul.id for ul in getattr(line, fieldname, [])]
|
|
to_write.extend(([line], {
|
|
key: [(value, uls)] for key, value in changes.items()
|
|
}))
|
|
if to_write:
|
|
OrderLine.write(*to_write)
|
|
|
|
@classmethod
|
|
def draft(cls, records):
|
|
for record in records:
|
|
if record.state != 'waiting':
|
|
continue
|
|
if record.unit_loads:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_draft_ul',
|
|
order=record.rec_name))
|
|
super(LoadOrder, cls).draft(records)
|
|
|
|
@classmethod
|
|
def delete(cls, records):
|
|
Sale = Pool().get('sale.sale')
|
|
|
|
sales = [r.sale for r in records if r.sale
|
|
and not r.sale.is_origin_load]
|
|
super().delete(records)
|
|
if sales:
|
|
Sale.restore_load_shipment_method(sales)
|
|
|
|
def _get_inventory_move(self, move):
|
|
pool = Pool()
|
|
Move = pool.get('stock.move')
|
|
|
|
location = move.unit_load.get_location(
|
|
[move.unit_load], product_id=move.product.id,
|
|
type='storage')[move.unit_load.id]
|
|
return Move(
|
|
from_location=location,
|
|
to_location=move.from_location,
|
|
product=move.product,
|
|
uom=move.uom,
|
|
quantity=move.quantity,
|
|
start_date=move.shipment.start_date
|
|
if move.shipment else self.start_date,
|
|
end_date=self.end_date,
|
|
company=move.company,
|
|
currency=move.currency,
|
|
unit_price=move.unit_price,
|
|
unit_load=move.unit_load,
|
|
origin=move.origin
|
|
)
|
|
|
|
@classmethod
|
|
@ModelView.button_action('carrier_load_ul.wizard_load_ul')
|
|
def run_try(cls, records):
|
|
pass
|
|
|
|
def get_failed_uls(self, unit_loads):
|
|
failed_uls = []
|
|
for unit_load in unit_loads:
|
|
lines = self._get_lines_from_origin_uls(unit_load)
|
|
if not self.check_ul_data_match(lines, unit_load):
|
|
failed_uls.append(unit_load)
|
|
return failed_uls
|
|
|
|
def add_ul(self, unit_loads, origin_restrict=None,
|
|
origin_restrict_warn=True, force=False):
|
|
pool = Pool()
|
|
UL = pool.get('stock.unit_load')
|
|
|
|
if origin_restrict is None:
|
|
origin_restrict = self.ul_origin_restrict
|
|
|
|
order_lines = {}
|
|
failed_uls = []
|
|
for unit_load in unit_loads:
|
|
# check state
|
|
self.check_add_unit_load_state(unit_load)
|
|
|
|
# check it is not loaded yet
|
|
if unit_load.load_line:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_loaded',
|
|
unit_load=unit_load.rec_name))
|
|
|
|
# check it is in storage location
|
|
if unit_load.location.type != 'storage':
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_location',
|
|
unit_load=unit_load.rec_name))
|
|
|
|
# check it is in warehouse
|
|
self.check_add_unit_load_warehouse(unit_load)
|
|
|
|
# check it is linked to origin lines
|
|
lines = self._get_lines_from_origin_uls(unit_load)
|
|
if not lines:
|
|
lines = self.check_origin_restrict(unit_load, origin_restrict,
|
|
origin_restrict_warn)
|
|
|
|
# check data matches
|
|
matched_lines = self.check_ul_data_match(lines, unit_load,
|
|
force=force)
|
|
if not matched_lines:
|
|
if force:
|
|
raise AddUnitLoadError(gettext(
|
|
'carrier_load_ul.'
|
|
'msg_carrier_load_order_ul_cannot_force',
|
|
unit_load=unit_load.rec_name,
|
|
order=self.rec_name))
|
|
failed_uls.append(unit_load)
|
|
|
|
# check overload line qty
|
|
line = self._choose_matched_line(matched_lines, order_lines,
|
|
unit_load)
|
|
if line:
|
|
# load UL
|
|
unit_load.load_line = line
|
|
elif unit_load not in failed_uls:
|
|
raise AddUnitLoadOverloadError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_overload',
|
|
order=self.rec_name,
|
|
unit_load=unit_load.code))
|
|
|
|
if failed_uls:
|
|
return failed_uls
|
|
self._update_sale(unit_loads)
|
|
|
|
if self.state == 'done':
|
|
for unit_load in unit_loads:
|
|
unit_load.load_lines = unit_load.load_lines + (
|
|
unit_load.load_line,)
|
|
unit_load.load_line = None
|
|
|
|
UL.save(unit_loads)
|
|
|
|
def check_add_unit_load_state(self, unit_load):
|
|
if unit_load.state != 'done':
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_state',
|
|
unit_load=unit_load.rec_name))
|
|
|
|
def check_add_unit_load_warehouse(self, unit_load):
|
|
wh = unit_load.at_warehouse
|
|
if not wh or wh.id != self.load.warehouse.id:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_warehouse',
|
|
unit_load=unit_load.rec_name,
|
|
warehouse=self.load.warehouse.rec_name,
|
|
order=self.rec_name))
|
|
|
|
def _choose_matched_line(self, lines, values, unit_load):
|
|
pool = Pool()
|
|
Configuration = pool.get('carrier.configuration')
|
|
configuration = Configuration(1)
|
|
|
|
line = None
|
|
for _line in lines:
|
|
if _line.id not in values:
|
|
values.setdefault(_line.id, set(
|
|
ul for ul in _line.get_unit_loads()))
|
|
if 'uls' in configuration.quantity_check:
|
|
if (_line.ul_quantity
|
|
and _line.ul_quantity - len(values[_line.id]) <= 0):
|
|
continue
|
|
if ('cases' in configuration.quantity_check
|
|
and _line.cases_quantity):
|
|
cases = sum(ul.cases_quantity for ul in values[_line.id])
|
|
if round(_line.cases_quantity - cases, cases_digits[1]) <= 0:
|
|
continue
|
|
line = _line
|
|
break
|
|
|
|
if line:
|
|
values[line.id].add(unit_load)
|
|
return line
|
|
|
|
def _get_origin_restrict_warning_id(self, unit_load):
|
|
return unit_load.id
|
|
|
|
def check_origin_restrict(self, unit_load, origin_restrict,
|
|
origin_restrict_warn):
|
|
pool = Pool()
|
|
Warning = pool.get('res.user.warning')
|
|
|
|
lines = []
|
|
warn = False
|
|
for line in self.lines:
|
|
if not line.origin:
|
|
lines.append(line)
|
|
elif line.origin.__name__ in self.valid_origin_restrict():
|
|
lines.append(line)
|
|
else:
|
|
if origin_restrict:
|
|
raise AddUnitLoadOriginError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_origin',
|
|
unit_load=unit_load.rec_name))
|
|
if origin_restrict_warn:
|
|
warn = True
|
|
lines.append(line)
|
|
if warn:
|
|
warning_name = ('loading_ul_origin_%s'
|
|
% self._get_origin_restrict_warning_id(unit_load))
|
|
if Warning.check(warning_name):
|
|
raise AddUnitLoadOriginWarning(warning_name, gettext(
|
|
'carrier_load_ul.msg_carrier_load_order_ul_origin',
|
|
unit_load=unit_load.rec_name))
|
|
return lines
|
|
|
|
@classmethod
|
|
def valid_origin_restrict(cls):
|
|
return ['sale.line']
|
|
|
|
def _get_lines_from_origin_uls(self, unit_load):
|
|
return [l for l in self.lines if l.origin and
|
|
unit_load in getattr(l.origin, 'unit_loads', [])]
|
|
|
|
def check_ul_data_match(self, lines, unit_load, force=False):
|
|
valid_lines = []
|
|
for line in lines:
|
|
product = getattr(line.origin, 'product', None)
|
|
if (not force and not line.origin) or not product:
|
|
valid_lines.append(line)
|
|
continue
|
|
if product.id == unit_load.product.id:
|
|
valid_lines.append(line)
|
|
return valid_lines
|
|
|
|
@classmethod
|
|
def _group_line_key(cls, items, item):
|
|
res = super(LoadOrder, cls)._group_line_key(items, item)
|
|
if (item.load_line and item.load_line.origin and
|
|
item.load_line.origin.__name__ == 'sale.line'):
|
|
return res + (('id', item.load_line.origin.id), )
|
|
return res
|
|
|
|
@classmethod
|
|
def _group_sale_line_key(cls, items, item):
|
|
if getattr(item, 'sale_line', None):
|
|
return (('id', item.sale_line.id), )
|
|
return cls._group_line_key(items, item)
|
|
|
|
def unload_ul(self, unit_loads):
|
|
UnitLoad = Pool().get('stock.unit_load')
|
|
|
|
self._update_sale(unit_loads)
|
|
UnitLoad.unload(unit_loads, load_order=self)
|
|
|
|
|
|
class LoadOrderLine(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.order.line'
|
|
|
|
quantity_check = fields.Function(
|
|
fields.MultiSelection('_quantity_check_selection', "Quantity Check"),
|
|
'get_quantity_check')
|
|
ul_quantity = fields.Float('ULs', digits=(16, 0),
|
|
domain=[If(And(
|
|
Eval('quantity_check', []).contains('uls'),
|
|
Eval('ul_quantity')),
|
|
('ul_quantity', '>=', Eval('loaded_uls')),
|
|
())],
|
|
depends=['loaded_uls', 'quantity_check', 'ul_quantity'])
|
|
quantity_per_ul = fields.Function(
|
|
fields.Float('Quantity per UL', digits=(16, Eval('unit_digits', 0)),
|
|
depends=['unit_digits']),
|
|
'on_change_with_quantity_per_ul')
|
|
unit_loads = fields.One2Many('stock.unit_load', 'load_line', 'Unit loads',
|
|
readonly=True, states={
|
|
'invisible': Eval('order_state') == 'done'
|
|
}, depends=['order_state'])
|
|
loaded_uls = fields.Function(fields.Float('Loaded ULs', digits=(16, 0)),
|
|
'get_loaded_uls')
|
|
loaded_unit_loads = fields.Many2Many(
|
|
'carrier.load.order.line-stock.unit_load', 'load_line', 'unit_load',
|
|
'Loaded unit loads', readonly=True,
|
|
states={
|
|
'invisible': Eval('order_state') != 'done'
|
|
}, depends=['order_state'])
|
|
loaded_cases = fields.Function(
|
|
fields.Float("Loaded Cases", digits=cases_digits),
|
|
'get_loaded_cases')
|
|
cases_quantity = fields.Function(
|
|
fields.Float("Cases", digits=cases_digits),
|
|
'get_cases_quantity')
|
|
|
|
@fields.depends('quantity', 'ul_quantity', 'uom')
|
|
def on_change_with_quantity_per_ul(self, name=None):
|
|
if self.quantity and self.ul_quantity:
|
|
return self.uom.round(self.quantity / self.ul_quantity)
|
|
return None
|
|
|
|
@classmethod
|
|
def _get_quantity_field(cls):
|
|
return 'ul_quantity'
|
|
|
|
@classmethod
|
|
def default_loaded_uls(cls):
|
|
return 0
|
|
|
|
@classmethod
|
|
def _quantity_check_selection(cls):
|
|
Configuration = Pool().get('carrier.configuration')
|
|
return Configuration.quantity_check.selection
|
|
|
|
def get_quantity_check(self, name=None):
|
|
pool = Pool()
|
|
Configuration = pool.get('carrier.configuration')
|
|
configuration = Configuration(1)
|
|
return configuration.quantity_check
|
|
|
|
def get_loaded_cases(self, name=None):
|
|
if self.order_state == 'done':
|
|
return sum([ul.cases_quantity for ul in self.loaded_unit_loads])
|
|
else:
|
|
return sum([ul.cases_quantity for ul in self.unit_loads])
|
|
|
|
def get_cases_quantity(self, name=None):
|
|
ul_cases = self.origin and getattr(
|
|
self.origin, 'ul_cases_quantity', 0) or 0
|
|
# we use origin.ul_cases_quantity instead origin.cases_quantity
|
|
# because we can define less ULs in load than origin
|
|
return round(ul_cases * self.ul_quantity, cases_digits[1])
|
|
|
|
def get_loaded_uls(self, name=None):
|
|
if self.order_state == 'done':
|
|
return len(self.loaded_unit_loads or [])
|
|
else:
|
|
return len(self.unit_loads or [])
|
|
|
|
@classmethod
|
|
def _get_origin(cls):
|
|
res = super(LoadOrderLine, cls)._get_origin()
|
|
res.append('sale.line')
|
|
return res
|
|
|
|
@classmethod
|
|
def copy(cls, records, default=None):
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default['unit_loads'] = None
|
|
default['loaded_unit_loads'] = None
|
|
return super().copy(records, default=default)
|
|
|
|
def get_unit_loads(self):
|
|
# loaded_unit_loads is setted just after do order.
|
|
# so we must check if it has value or return the other list to allow
|
|
# using trigger for send email
|
|
if self.order.state == 'done' and self.loaded_unit_loads:
|
|
return [ul for ul in self.loaded_unit_loads] or []
|
|
return [ul for ul in self.unit_loads] or []
|
|
|
|
|
|
class LoadUnitLoadOrder(ModelView):
|
|
"""Carrier load unit load"""
|
|
__name__ = 'carrier.load_uls.order'
|
|
|
|
load_order = fields.Many2One('carrier.load.order', 'Order',
|
|
required=True,
|
|
domain=[('state', 'in', ['waiting', 'running'])])
|
|
|
|
|
|
# TODO: configure ul_code reading by a string pattern (ex: P${code})
|
|
# in carrier.configuration to read it with barcode scanner
|
|
class LoadUnitLoadData(ModelView):
|
|
"""Carrier load unit load"""
|
|
__name__ = 'carrier.load_uls.data'
|
|
|
|
load_order = fields.Many2One('carrier.load.order', 'Order',
|
|
readonly=True,
|
|
depends=['standalone', 'order_state'])
|
|
order_state = fields.Char('State', readonly=True)
|
|
standalone = fields.Boolean('Standalone', readonly=True)
|
|
ul_code = fields.Char('UL')
|
|
at_warehouse = fields.Many2One('stock.location', 'Warehouse at date')
|
|
uls_to_load = fields.One2Many('stock.unit_load', None, 'ULs to load',
|
|
domain=[
|
|
('state', '=', 'done'),
|
|
('production_state', '=', 'done'),
|
|
('available', '=', True),
|
|
('at_warehouse', '=', Eval('at_warehouse'))],
|
|
depends=['at_warehouse'])
|
|
uls_loaded = fields.Many2Many('stock.unit_load', None, None,
|
|
'Loaded ULs',
|
|
domain=[('id', 'in', Eval('uls_loaded_domain'))],
|
|
context={'ul_extended_rec_name': True},
|
|
depends=['load_order', 'uls_loaded_domain'])
|
|
uls_loaded_domain = fields.One2Many('stock.unit_load', None,
|
|
'ULs loaded Domain')
|
|
loaded_uls = fields.Float('Loaded ULs', digits=(16, 0),
|
|
readonly=True)
|
|
|
|
|
|
class LoadUnitLoadFailed(ModelView):
|
|
"""Carrier load unit load failed"""
|
|
__name__ = 'carrier.load_uls.failed'
|
|
|
|
failed_uls = fields.One2Many('stock.unit_load', None, 'Failed ULs',
|
|
readonly=True)
|
|
|
|
|
|
class LoadUnitLoad(Wizard):
|
|
"""Carrier load unit load"""
|
|
__name__ = 'carrier.load_uls'
|
|
|
|
start = StateTransition()
|
|
order = StateView('carrier.load_uls.order',
|
|
'carrier_load_ul.load_uls_order_view_form',
|
|
[Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('OK', 'post_order', 'tryton-ok', default=True)])
|
|
post_order = StateTransition()
|
|
data = StateView('carrier.load_uls.data',
|
|
'carrier_load_ul.load_uls_data_view_form', [
|
|
Button('Cancel', 'exit', 'tryton-cancel'),
|
|
Button('Unload ULs', 'unload_', 'tryton-undo'),
|
|
Button('Do', 'pre_do', 'tryton-ok', states={
|
|
'readonly': Eval('ul_code') | Eval('uls_to_load'),
|
|
'invisible': (Eval('order_state') == 'done')}),
|
|
Button('Load', 'load_', 'tryton-add', default=True)])
|
|
failed = StateView('carrier.load_uls.failed',
|
|
'carrier_load_ul.load_uls_failed_view_form', [
|
|
Button('Force load', 'force', 'tryton-forward',
|
|
states={
|
|
'readonly': ~Id('carrier_load_ul',
|
|
'group_force_load').in_(
|
|
Eval('context', {}).get('groups', [])),
|
|
}),
|
|
Button('Accept', 'not_force', 'tryton-ok', True)
|
|
])
|
|
not_force = StateTransition()
|
|
force = StateTransition()
|
|
load_ = StateTransition()
|
|
unload_ = StateTransition()
|
|
pre_do = StateTransition()
|
|
do_ = StateAction('carrier_load.wizard_load_order_do')
|
|
open_ = StateAction('carrier_load.act_load_order')
|
|
exit = StateTransition()
|
|
|
|
def transition_start(self):
|
|
pool = Pool()
|
|
Loadorder = pool.get('carrier.load.order')
|
|
ModelData = pool.get('ir.model.data')
|
|
|
|
action_id = ModelData.get_id('carrier_load_ul', 'wizard_force_load_ul')
|
|
|
|
if Transaction().context.get('active_model') != LoadOrder.__name__:
|
|
return 'order'
|
|
|
|
order = Loadorder(Transaction().context['active_id'])
|
|
if (Transaction().context.get('action_id') == action_id
|
|
and order.type == 'internal'):
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_uls_internal_type',
|
|
order=order.rec_name))
|
|
|
|
if order.state == 'waiting' and len(order.unit_loads) > 0:
|
|
Loadorder.run([order])
|
|
return 'end'
|
|
return 'data'
|
|
|
|
def transition_post_order(self):
|
|
return 'data'
|
|
|
|
def default_data(self, fields):
|
|
order, standalone = self._get_load_order()
|
|
res = {'load_order': order.id,
|
|
'loaded_uls': 0,
|
|
'standalone': standalone,
|
|
'uls_loaded_domain': list(map(int, order.unit_loads)) or [],
|
|
'order_state': order.state,
|
|
'at_warehouse': order.warehouse.id}
|
|
if order.unit_loads:
|
|
res['loaded_uls'] = len(order.unit_loads)
|
|
return res
|
|
|
|
def load(self, origin_restrict=True, origin_restrict_warn=True):
|
|
pool = Pool()
|
|
Order = pool.get('carrier.load.order')
|
|
|
|
order_id, _ = self._get_load_order()
|
|
uls = []
|
|
if self.data.uls_to_load is not None:
|
|
uls = list(self.data.uls_to_load)
|
|
|
|
if not self.data.ul_code and not uls:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_uls_ul_required'))
|
|
|
|
if isinstance(order_id, int):
|
|
order = Order(order_id)
|
|
else:
|
|
order = order_id
|
|
|
|
if self.data.ul_code:
|
|
ul = self._get_unit_load_by_code(self.data.ul_code)
|
|
uls.append(ul[0])
|
|
if uls:
|
|
failed_uls = order.add_ul(uls,
|
|
origin_restrict_warn=origin_restrict_warn)
|
|
if failed_uls:
|
|
return failed_uls
|
|
if order.state in ('draft', 'waiting'):
|
|
order.run([order])
|
|
|
|
def transition_load_(self):
|
|
failed_uls = self.load()
|
|
if failed_uls:
|
|
return 'failed'
|
|
return 'data'
|
|
|
|
def default_failed(self, fields):
|
|
order, _ = self._get_load_order()
|
|
if self.data.ul_code:
|
|
uls = self._get_unit_load_by_code(self.data.ul_code)
|
|
else:
|
|
uls = self.data.uls_to_load
|
|
failed_uls = order.get_failed_uls(uls)
|
|
return {
|
|
'failed_uls': [f.id for f in failed_uls]
|
|
}
|
|
|
|
def transition_not_force(self):
|
|
return 'data'
|
|
|
|
def transition_pre_do(self):
|
|
return 'do_'
|
|
|
|
def do_do_(self, action):
|
|
pool = Pool()
|
|
Modeldata = pool.get('ir.model.data')
|
|
ActionWizard = pool.get('ir.action.wizard')
|
|
Action = pool.get('ir.action')
|
|
|
|
order, standalone = self._get_load_order()
|
|
data = {
|
|
'ids': [order.id],
|
|
'id': order.id,
|
|
'model': order.__name__,
|
|
}
|
|
if Transaction().context['active_model'] != 'carrier.load.order':
|
|
action_wizard = ActionWizard(Modeldata.get_id(
|
|
'carrier_load_ul', 'wizard_load_order_do_open'))
|
|
action = action_wizard.action
|
|
action = Action.get_action_values(action.type, [action.id])[0]
|
|
return action, data
|
|
|
|
def transition_do_(self):
|
|
return 'end'
|
|
|
|
def do_open_(self, action):
|
|
order, _ = self._get_load_order()
|
|
action['views'].reverse()
|
|
return action, {'res_id': [order.id]}
|
|
|
|
def transition_exit(self):
|
|
if Transaction().context['active_model'] != 'carrier.load.order':
|
|
return 'open_'
|
|
return 'end'
|
|
|
|
def _get_load_order(self):
|
|
pool = Pool()
|
|
Order = pool.get('carrier.load.order')
|
|
|
|
if Transaction().context.get('active_model') == LoadOrder.__name__:
|
|
return Order(Transaction().context.get('active_id')), False
|
|
return Order(self.order.load_order.id), True
|
|
|
|
def unload(self):
|
|
pool = Pool()
|
|
UnitLoad = pool.get('stock.unit_load')
|
|
Order = pool.get('carrier.load.order')
|
|
|
|
order_id, _ = self._get_load_order()
|
|
uls = list(self.data.uls_loaded)
|
|
if not uls:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_uls_unload_any'))
|
|
if isinstance(order_id, int):
|
|
order = Order(order_id)
|
|
else:
|
|
order = order_id
|
|
if isinstance(uls[0], int):
|
|
uls = UnitLoad.browse(uls)
|
|
order.unload_ul(uls)
|
|
|
|
def transition_unload_(self):
|
|
self.unload()
|
|
return 'data'
|
|
|
|
def transition_force(self):
|
|
uls = self.failed.failed_uls
|
|
order, _ = self._get_load_order()
|
|
order.add_ul(uls, origin_restrict_warn=False, force=True)
|
|
return 'data'
|
|
|
|
@classmethod
|
|
def _get_unit_load_by_code(cls, code):
|
|
UL = Pool().get('stock.unit_load')
|
|
uls = UL.search(UL._get_barcode_search_domain(code))
|
|
if not uls:
|
|
raise UserError(gettext(
|
|
'carrier_load_ul.msg_carrier_load_uls_invalid_ul',
|
|
code=code))
|
|
return uls
|
|
|
|
|
|
class DoLoadOrder(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.order.do'
|
|
|
|
open_ = StateAction('carrier_load.act_load_order')
|
|
|
|
@classmethod
|
|
def next_action(cls, name):
|
|
Modeldata = Pool().get('ir.model.data')
|
|
|
|
next_ = super().next_action(name)
|
|
if next_ == 'end':
|
|
open_action_id = Modeldata.get_id('carrier_load_ul',
|
|
'wizard_load_order_do_open')
|
|
if Transaction().context['action_id'] == open_action_id:
|
|
return 'open_'
|
|
return next_
|
|
|
|
def do_open_(self, action):
|
|
pool = Pool()
|
|
Order = pool.get('carrier.load.order')
|
|
|
|
order = Order(Transaction().context.get('active_id'))
|
|
action['views'].reverse()
|
|
return action, {'res_id': [order.id]}
|
|
|
|
|
|
class LoadSheet(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.sheet'
|
|
|
|
@classmethod
|
|
def _get_lines(cls, order):
|
|
res = {}
|
|
|
|
if not order.unit_loads:
|
|
return super()._get_lines(order)
|
|
|
|
for ul in order.unit_loads:
|
|
res.setdefault(ul.product.id, cls.get_line_dict(ul.product))
|
|
res[ul.product.id]['ul_quantity'] += 1
|
|
res[ul.product.id]['cases_quantity'] += ul.cases_quantity
|
|
return res
|
|
|
|
@classmethod
|
|
def get_line_dict(cls, item):
|
|
res = super().get_line_dict(item)
|
|
res.update({
|
|
'ul_quantity': 0,
|
|
'cases_quantity': 0
|
|
})
|
|
return res
|
|
|
|
|
|
class TransportReportMixin(object):
|
|
|
|
@classmethod
|
|
def _get_product_origins(cls, order):
|
|
if order.unit_loads:
|
|
return order.unit_loads
|
|
return super()._get_product_origins(order)
|
|
|
|
@classmethod
|
|
def product_weight(cls, product_key, origins, language):
|
|
pool = Pool()
|
|
Uom = pool.get('product.uom')
|
|
Modeldata = pool.get('ir.model.data')
|
|
|
|
cat_weight = Modeldata.get_id('product', 'uom_cat_weight')
|
|
kg_uom = Uom(Modeldata.get_id('product', 'uom_kilogram'))
|
|
|
|
product = origins and origins[0].product or None
|
|
if product.default_uom.category.id != cat_weight:
|
|
return None
|
|
if origins[0].__name__ == 'stock.unit_load':
|
|
res = sum(Uom.compute_qty(
|
|
ul.uom, ul.quantity, kg_uom) or 0 for ul in origins)
|
|
return res
|
|
return super().product_weight(product_key, origins, language)
|
|
|
|
@classmethod
|
|
def product_packages(cls, product_key, origins, language):
|
|
if origins[0].__name__ == 'stock.unit_load':
|
|
return sum(ul.cases_quantity for ul in origins) or None
|
|
elif origins[0].__name__ == 'carrier.load.order.line':
|
|
return sum(getattr(o.origin, 'cases_quantity', 0) or 0
|
|
for o in origins if o.origin) or None
|
|
return super().product_packages(product_key, origins, language)
|
|
|
|
|
|
class CMR(TransportReportMixin, metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.order.cmr'
|
|
|
|
|
|
class RoadTransportNote(TransportReportMixin, metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.order.road_note'
|
|
|
|
|
|
class CreateLoadDataMixin(object):
|
|
|
|
origin_name = fields.Char('Origin name', readonly=True)
|
|
load_order = fields.Many2One('carrier.load.order', 'Load order',
|
|
domain=[
|
|
('type', '=', 'out'),
|
|
('state', 'in', ['draft', 'waiting', 'running']),
|
|
('lines.origin', 'like', Eval('origin_name'))
|
|
],
|
|
depends=['origin_name'],
|
|
help="Define to reuse an existing Load Order.")
|
|
warehouse = fields.Many2One('stock.location', 'Warehouse',
|
|
readonly=True,
|
|
domain=[('type', '=', 'warehouse')],
|
|
states={'invisible': Bool(Eval('load_order'))},
|
|
depends=['load_order'])
|
|
dock = fields.Many2One('stock.location.dock', 'Dock',
|
|
domain=[('location', '=', Eval('warehouse', 0))],
|
|
states={
|
|
'invisible': Bool(Eval('load_order'))
|
|
},
|
|
depends=['warehouse', 'load_order'])
|
|
carrier = fields.Many2One('carrier', 'Carrier',
|
|
states={
|
|
'required': ~Eval('load_order') & Bool(Eval('load_purchasable')),
|
|
'invisible': Bool(Eval('load_order'))
|
|
},
|
|
depends=['load_order', 'load_purchasable'])
|
|
vehicle_number = fields.Char('Vehicle reg. number',
|
|
states={
|
|
'required': ~Eval('load_order') & Bool(Eval('vehicle_required')),
|
|
'invisible': Bool(Eval('load_order'))
|
|
},
|
|
depends=['load_order', 'vehicle_required'])
|
|
vehicle_required = fields.Boolean('Vehicle required')
|
|
trailer_number = fields.Char('Trailer reg. number',
|
|
states={
|
|
'invisible': Bool(Eval('load_order')),
|
|
'required': ~Eval('load_order') & Bool(Eval('trailer_required')),
|
|
},
|
|
depends=['load_order', 'trailer_required'])
|
|
trailer_required = fields.Boolean('Trailer required')
|
|
load_purchasable = fields.Boolean('Load purchasable',
|
|
states={'invisible': Bool(Eval('load_order'))},
|
|
depends=['load_order'])
|
|
driver = fields.Char('Driver',
|
|
states={
|
|
'invisible': Bool(Eval('load_order'))
|
|
}, depends=['load_order'])
|
|
driver_identifier = fields.Char('Driver identifier',
|
|
states={
|
|
'required': Bool(Eval('driver')),
|
|
'invisible': Bool(Eval('load_order'))},
|
|
depends=['driver', 'load_order'])
|
|
carrier_info = fields.Text('Carrier information',
|
|
states={
|
|
'invisible': Bool(Eval('load_purchasable')) | Bool(Eval('carrier'))
|
|
},
|
|
depends=['load_purchasable', 'carrier'])
|
|
driver_phone = fields.Char('Driver Phone',
|
|
states={
|
|
'invisible': Bool(Eval('load_order'))
|
|
}, depends=['load_order'])
|
|
company = fields.Many2One('company.company', 'Company', readonly=True)
|
|
|
|
@classmethod
|
|
def default_load_purchasable(cls):
|
|
pool = Pool()
|
|
Configuration = pool.get('carrier.configuration')
|
|
conf = Configuration(1)
|
|
return conf.load_purchasable
|
|
|
|
@fields.depends('carrier', 'vehicle_number')
|
|
def autocomplete_vehicle_number(self):
|
|
Load = Pool().get('carrier.load')
|
|
return Load._autocomplete_registration_numbers(self.carrier,
|
|
'vehicle_number', self.vehicle_number)
|
|
|
|
@fields.depends('carrier', 'trailer_number')
|
|
def autocomplete_trailer_number(self):
|
|
Load = Pool().get('carrier.load')
|
|
return Load._autocomplete_registration_numbers(self.carrier,
|
|
'trailer_number', self.trailer_number)
|
|
|
|
@fields.depends('carrier', 'driver')
|
|
def autocomplete_driver(self):
|
|
Load = Pool().get('carrier.load')
|
|
return Load._autocomplete_registration_numbers(self.carrier,
|
|
'driver', self.driver)
|
|
|
|
@fields.depends('carrier', 'driver_identifier')
|
|
def autocomplete_driver_identifier(self):
|
|
Load = Pool().get('carrier.load')
|
|
return Load._autocomplete_registration_numbers(self.carrier,
|
|
'driver_identifier', self.driver_identifier)
|
|
|
|
@fields.depends('driver_phone')
|
|
def on_change_driver_phone(self):
|
|
self.driver_phone = self.format_phone(self.driver_phone)
|
|
|
|
@classmethod
|
|
def format_phone(cls, value=None):
|
|
if phonenumbers:
|
|
try:
|
|
phonenumber = phonenumbers.parse(value)
|
|
except NumberParseException:
|
|
pass
|
|
else:
|
|
value = phonenumbers.format_number(
|
|
phonenumber, PhoneNumberFormat.INTERNATIONAL)
|
|
return value
|
|
|
|
|
|
class CreateLoadDataLineMixin(object):
|
|
|
|
available_ul_quantity = fields.Float('Available ULs', digits=(16, 0),
|
|
readonly=True)
|
|
ul_quantity = fields.Float('ULs', digits=(16, 0),
|
|
domain=[
|
|
('ul_quantity', '<=', Eval('available_ul_quantity')),
|
|
('ul_quantity', '>=', 0)],
|
|
depends=['available_ul_quantity'])
|
|
|
|
|
|
class LoadOrderGrouping(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.order'
|
|
|
|
@classmethod
|
|
def _get_load_group_key(cls, items, item):
|
|
res = super()._get_load_group_key(items, item)
|
|
if (item.load_line and item.load_line.order.party and
|
|
item.load_line.order.party.load_grouping_method == 'unit_load'):
|
|
if res is None:
|
|
res = {}
|
|
res['unit_load'] = item.id
|
|
return res
|
|
|
|
|
|
class LoadOrderLineUL(ModelSQL):
|
|
"""Carrier Load Order Line - Stock Unit Load"""
|
|
__name__ = 'carrier.load.order.line-stock.unit_load'
|
|
_table = 'carrier_load_order_line_ul_rel'
|
|
|
|
load_line = fields.Many2One('carrier.load.order.line', 'Load Line',
|
|
required=True, select=True)
|
|
unit_load = fields.Many2One('stock.unit_load', 'Unit Load', required=True,
|
|
select=True)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super().__setup__()
|
|
t = cls.__table__()
|
|
cls._sql_constraints += [
|
|
('load_line_ul_uniq', Unique(t, t.load_line, t.unit_load),
|
|
'carrier_load_ul.msg_carrier_load_order_line-stock_unit_load_load_line_ul_uniq')]
|
|
|
|
@classmethod
|
|
def __register__(cls, module_name):
|
|
pool = Pool()
|
|
table_h = cls.__table_handler__(module_name)
|
|
OrderLine = pool.get('carrier.load.order.line')
|
|
Order = pool.get('carrier.load.order')
|
|
UnitLoad = pool.get('stock.unit_load')
|
|
sql_table = cls.__table__()
|
|
order_line = OrderLine.__table__()
|
|
load_order = Order.__table__()
|
|
unit_load = UnitLoad.__table__()
|
|
cursor = Transaction().connection.cursor()
|
|
|
|
exist = table_h.table_exist(cls._table)
|
|
|
|
super().__register__(module_name)
|
|
|
|
if not exist:
|
|
select_query = unit_load.join(order_line, condition=(
|
|
unit_load.load_line == order_line.id)
|
|
).join(load_order, condition=(
|
|
order_line.order == load_order.id)
|
|
).select(
|
|
load_order.write_date,
|
|
load_order.write_date,
|
|
load_order.write_uid,
|
|
load_order.write_uid,
|
|
order_line.id,
|
|
unit_load.id,
|
|
where=load_order.state == 'done')
|
|
cursor.execute(*sql_table.insert(
|
|
columns=[
|
|
sql_table.create_date,
|
|
sql_table.write_date,
|
|
sql_table.create_uid,
|
|
sql_table.write_uid,
|
|
sql_table.load_line,
|
|
sql_table.unit_load],
|
|
values=select_query)
|
|
)
|
|
cursor.execute(*sql_table.select(sql_table.unit_load))
|
|
ul_ids = [x[0] for x in cursor.fetchall()]
|
|
cursor.execute(*unit_load.update([unit_load.load_line],
|
|
[Null], where=reduce_ids(unit_load.id, ul_ids)))
|
|
|
|
|
|
class CarrierLoadPurchase(metaclass=PoolMeta):
|
|
__name__ = 'carrier.load.purchase'
|
|
|
|
@classmethod
|
|
def _get_lines_to_group(cls, purchase, customer):
|
|
lines = super()._get_lines_to_group(purchase, customer)
|
|
return [ul for line in lines for ul in line.loaded_unit_loads]
|
|
|
|
@classmethod
|
|
def _get_line_keygroup(cls, line):
|
|
if line.__name__ == 'stock.unit_load':
|
|
load_line = line.load_line or line.load_lines[-1]
|
|
fields = super()._get_line_keygroup(load_line)
|
|
return fields + (
|
|
('pallet_product', line.pallet_product),
|
|
)
|
|
|
|
return super()._get_line_keygroup(line)
|
|
|
|
@classmethod
|
|
def _get_line_quantity(cls, line):
|
|
if line.__name__ == 'stock.unit_load':
|
|
return 1
|
|
return line.ul_quantity
|
|
|
|
@classmethod
|
|
def _get_line_address(cls, line):
|
|
if line.__name__ == 'stock.unit_load':
|
|
load_line = line.load_line or line.load_lines[-1]
|
|
return super()._get_line_address(load_line)
|
|
return super()._get_line_address(line)
|