trytond-stock_shipment_in_edi/shipment.py

417 lines
16 KiB
Python

# encoding: utf-8
# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
from trytond.model import fields
from trytond.pool import PoolMeta, Pool
from trytond.pyson import Eval
import os
from trytond.modules.stock.move import STATES as MOVE_STATES
from trytond.modules.edocument_unedifact.edocument import (EdifactMixin,
UOMS_EDI_TO_TRYTON, EdiTemplate)
from trytond.modules.edocument_unedifact.edocument import (Message, Serializer)
from trytond.modules.edocument_unedifact.edocument import (with_segment_check,
separate_section, RewindIterator, DO_NOTHING, NO_ERRORS)
from datetime import datetime
from trytond.exceptions import UserError
DEFAULT_FILES_LOCATION = '/tmp/'
MODULE_PATH = os.path.dirname(os.path.abspath(__file__))
DEFAULT_TEMPLATE = 'DESADV_ediversa.yml'
KNOWN_EXTENSIONS = ['.txt', '.edi', '.pla']
class Cron(metaclass=PoolMeta):
__name__ = 'ir.cron'
@classmethod
def __setup__(cls):
super(Cron, cls).__setup__()
cls.method.selection.extend([
('stock.shipment.in|get_edi_shipments_cron',
'Import EDI Shipment In Orders')])
class Move(metaclass=PoolMeta):
__name__ = 'stock.move'
edi_quantity = fields.Float('EDI Quantity',
digits=(16, Eval('unit_digits', 2)),
states=MOVE_STATES, depends=['state', 'unit_digits'])
edi_description = fields.Text('EDI Description', size=None)
@classmethod
def copy(cls, records, default=None):
default = default.copy() if default else {}
default.setdefault('edi_quantity')
default.setdefault('edi_description')
return super(Move, cls).copy(records, default=default)
class ShipmentIn(EdifactMixin, metaclass=PoolMeta):
__name__ = 'stock.shipment.in'
despatch_date = fields.Date('Despatch Date',
states={
'readonly': Eval('state').in_(['cancelled', 'done']),
},
depends=['state'])
#TODO remove field if purchase_shipment_cost is ever installed
carrier = fields.Many2One('carrier', 'Carrier', states={
'readonly': Eval('state') != 'draft',
},
depends=['state'])
@classmethod
def import_edi_input(cls, response, template):
pool = Pool()
ProductCode = pool.get('product.code')
Move = pool.get('stock.move')
def get_new_move():
move = None
if product:
move = Move()
move.product = product
move.quantity = quantity
move.uom = values.get('unit')
move.state = 'draft'
move.company = purchase.company
move.currency = purchase.currency
move.unit_price = product.list_price
move.shipment = shipment
if (quantity or 0) >= 0:
move.from_location = purchase.party.supplier_location.id
elif purchase.return_from_location:
move.from_location = purchase.return_from_location.id
if (quantity or 0) >= 0:
if purchase.warehouse:
move.to_location = purchase.warehouse.input_location.id
else:
move.to_location = purchase.party.supplier_location.id
return move
def manage_lots():
if (hasattr(Move, 'lot')
and move.product.lot_is_required(move.from_location,
move.to_location)
and values.get('lot')):
Lot = pool.get('stock.lot')
lot, = Lot.search([
('number', '=', values.get('lot')),
('product', '=', move.product)
], limit=1) or [None]
expiration_date_exist = hasattr(Lot, 'expiration_date')
if lot and expiration_date_exist:
expiration_date = values.get('expiration_date')
if (expiration_date and lot.expiration_date
and expiration_date < lot.expiration_date):
lot.expiration_date = expiration_date
elif not lot:
lot = Lot()
lot.number = values.get('lot')
lot.product = product
if expiration_date_exist:
lot.expiration_date = values.get('expiration_date')
lot.save()
for move_ in moves_:
move_.lot = lot
total_errors = []
control_chars = cls.set_control_chars(
template.get('control_chars', {}))
message = Message.from_str(response.upper().replace('\r', ''),
characters=control_chars)
segments_iterator = RewindIterator(message.segments)
template_header = template.get('header', {})
template_detail = template.get('detail', {})
detail = [x for x in separate_section(segments_iterator, start='CPS')]
del(segments_iterator)
# If there isn't a segment DESADV_D_96A_UN_EAN005
# means the file readed it's not a order response.
unh = message.get_segment('UNH')
if not unh or unh.elements[1][0] != 'DESADV':
return DO_NOTHING, NO_ERRORS
rffs = [x for x in message.get_segments('RFF')]
rff, = [x for x in rffs if x.elements[0][0] == 'ON'][:1] or [None]
reference, = [x for x in rffs if x.elements[0][0] == 'DQ'] or [None]
template_rff = template_header.get('RFF')
purchase, errors = cls._process_RFF(rff, template_rff, control_chars)
if errors:
total_errors += errors
if not purchase:
return None, total_errors
shipment = cls()
shipment.supplier = purchase.party
shipment.on_change_supplier()
shipment.warehouse = purchase.warehouse
shipment.reference = reference.elements[0][2] if reference else None
available_moves = [m for m in purchase.moves if not m.shipment]
dtms = [d for d in message.get_segments('DTM')]
template_dtm = template_header.get('DTM')
effective_date, planned_date, despatch_date, errors = \
cls._process_DTMs(dtms, template_dtm, control_chars)
if errors:
total_errors += errors
shipment.effective_date = effective_date
shipment.planned_date = planned_date
if despatch_date:
shipment.despatch_date = despatch_date
bgm = message.get_segment('BGM')
template_bgm = template_header.get('BGM')
reference, errors = cls._process_BGM(bgm, template_bgm,
control_chars)
if errors:
total_errors += errors
shipment.reference = reference
tdt = message.get_segment('TDT')
if tdt:
template_tdt = template_header.get('TDT')
carrier, errors = cls._process_TDT(tdt, template_tdt,
control_chars)
shipment.carrier = carrier
if errors:
total_errors += errors
del(template_header)
shipment.save()
scannable_codes = ProductCode.search([
('product', 'in', [l.product.id for l in available_moves])
])
scannable_products = {pc.number: pc.product for pc in scannable_codes}
to_save = []
for cps_group in detail:
segments_iterator = RewindIterator(cps_group)
linegroups = [x for x in separate_section(segments_iterator,
start='LIN')]
for linegroup in linegroups:
values = {}
for segment in linegroup:
if segment.tag not in [x[0:3] for x in template_detail.keys()]:
continue
template_segment = template_detail.get(segment.tag)
tag = (segment.tag if segment.tag.endswith('LIN') else
'{}LIN'.format(segment.tag))
process = eval('cls._process_{}'.format(tag))
to_update, errors = process(segment, template_segment)
if errors:
# If there are errors the linegroup isn't processed
break
if to_update:
values.update(to_update)
if errors:
total_errors += errors
continue
product = scannable_products.get(values.get('product'))
quantity = values.get('quantity')
matching_moves = None
moves_ = []
if product:
matching_moves = [
m for m in available_moves if m.product == product]
if matching_moves:
move = matching_moves[0]
else:
move = get_new_move()
else:
product_code, = ProductCode.search([
('number', '=', values.get('product'))
], limit=1) or [None]
if not product_code:
continue
product = product_code.product
move = get_new_move()
if getattr(move, 'purchase') and move.quantity != quantity:
diff = move.quantity - quantity
if diff > 0:
new_move, = Move.copy([move], {'quantity': abs(diff)})
move.quantity = quantity
else:
new_move = get_new_move()
new_move.quantity = abs(diff)
moves_.append(new_move)
move.shipment = shipment
moves_.append(move)
for move_ in moves_:
move_.planned_date = shipment.planned_date
move_.edi_quantity = quantity # proportional or original?
move_.edi_description = values.get('description')
move_amount = values.get('amount', None)
if move_amount:
move_.unit_price = move_amount / quantity
manage_lots()
to_save.extend(moves_)
if to_save:
try:
Move.save(to_save)
except UserError as e:
total_errors.append(e.message)
return None, total_errors
return shipment, total_errors
@classmethod
def _process_RFF(cls, segment, template_segment, control_chars=None):
pool = Pool()
Purchase = pool.get('purchase.purchase')
purchase_num = segment.elements[0][2]
purchase, = Purchase.search([
('number', '=', purchase_num),
('state', 'in', ['processing', 'done'])
], limit=1) or [None]
if not purchase:
error_msg = 'Purchase number {} not found'.format(purchase_num)
serialized_segment = Serializer(control_chars).serialize([segment])
return DO_NOTHING, ['{}: {}'.format(error_msg, serialized_segment)]
return purchase, NO_ERRORS
@classmethod
def _process_TDT(cls, segment, template_segment, control_chars=None):
pool = Pool()
Identifier = pool.get('party.identifier')
Carrier = pool.get('carrier')
carrier_ean = segment.elements[6]
identifiers = Identifier.search([
('type', '=', 'EDI_supplier'),
('code', '=', carrier_ean)
])
if identifiers:
party = identifiers[0].party
carriers = Carrier.search([('party', '=', party)])
return carriers[0], NO_ERRORS
return {}, NO_ERRORS
@classmethod
def _process_DTMs(cls, segments, template, control_chars=None):
despatch_date = None
for segment in segments:
date_type = segment.elements[0][0]
if date_type == '137':
effective_date = cls.get_datetime_obj_from_edi_date(
segment.elements[0][2])
elif date_type == '191':
planned_date = (cls.get_datetime_obj_from_edi_date(
segment.elements[0][2]))
elif date_type == '11':
despatch_date = (cls.get_datetime_obj_from_edi_date(
segment.elements[0][2]))
return effective_date, planned_date, despatch_date, NO_ERRORS
@classmethod
@with_segment_check
def _process_DTM(cls, segment, template, control_chars=None):
effective_date = cls.get_datetime_obj_from_edi_date(
segment.elements[0][2])
planned_date = (cls.get_datetime_obj_from_edi_date(
segment.elements[1]) if len(segment.elements) > 1 else None)
return effective_date, planned_date, NO_ERRORS
@classmethod
def _process_BGM(cls, segment, template, control_chars=None):
return segment.elements[0], NO_ERRORS
@classmethod
def _process_LIN(cls, segment, template):
return {'product': segment.elements[2][0]}, NO_ERRORS
@classmethod
def _process_MOALIN(cls, segment, template):
return {'amount': float(segment.elements[0][2])}, NO_ERRORS
@classmethod
def _process_QTYLIN(cls, segment, template):
pool = Pool()
Uom = pool.get('product.uom')
result = {}
qualifier = segment.elements[0][0]
if qualifier != '12':
return DO_NOTHING, NO_ERRORS
if len(segment.elements[0]) > 2:
uom_value = UOMS_EDI_TO_TRYTON.get(segment.elements[0][2], 'u')
else:
uom_value = 'u'
uom, = Uom.search([('symbol', '=', uom_value)], limit=1)
result['unit'] = uom
quantity = float(segment.elements[0][2])
result['quantity'] = quantity
return result, NO_ERRORS
@classmethod
def _process_IMDLIN(cls, segment, template):
description = segment.elements[1] or None
return {'description': description}, NO_ERRORS
@classmethod
def _process_PCILIN(cls, segment, template):
return DO_NOTHING, NO_ERRORS
@classmethod
def _process_DTMLIN(cls, segment, template):
if segment.elements[0][0] != '36':
return DO_NOTHING, NO_ERRORS
expiration_date = cls.get_datetime_obj_from_edi_date(
segment.elements[0][2])
expiration_date = (expiration_date and expiration_date.date()
or expiration_date)
return {'expiration_date': expiration_date}, NO_ERRORS
@classmethod
def _process_GINLIN(cls, segment, template):
if segment.elements[0] != 'BX':
return DO_NOTHING, NO_ERRORS
return {'lot': segment.elements[1]}, NO_ERRORS
@classmethod
def _process_CPSLIN(cls, segment, template):
return DO_NOTHING, NO_ERRORS
@classmethod
def create_edi_shipments(cls):
pool = Pool()
Configuration = pool.get('stock.configuration')
configuration = Configuration(1)
source_path = os.path.abspath(configuration.inbox_path_edi
or DEFAULT_FILES_LOCATION)
errors_path = os.path.abspath(configuration.errors_path_edi
or DEFAULT_FILES_LOCATION)
if not configuration.template_order_response_edi:
template_name = (configuration.template_order_response_edi
or DEFAULT_TEMPLATE)
template_path = os.path.join(os.path.join(MODULE_PATH,
'templates'), template_name)
else:
template_path = configuration.template_order_response_edi
template_name = template_path.split('/')[-1]
template = EdiTemplate(template_name, template_path)
return cls.process_edi_inputs(source_path, errors_path, template)
@classmethod
def get_edi_shipments_cron(cls):
cls.create_edi_shipments()
return True
class StockConfiguration(metaclass=PoolMeta):
__name__ = 'stock.configuration'
inbox_path_edi = fields.Char('Inbox Path EDI')
errors_path_edi = fields.Char('Errors Path')
template_order_response_edi = fields.Char('Template EDI Used for Response',
help='Path of the YAML file.')