467 lines
15 KiB
Python
467 lines
15 KiB
Python
# The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
from datetime import datetime
|
|
from trytond.model import ModelSQL, ModelView, fields, Exclude
|
|
from sql.conditionals import Coalesce
|
|
from sql.operators import Equal as SqlEqual
|
|
from trytond.pool import Pool
|
|
from trytond.pyson import PYSONEncoder
|
|
from trytond.transaction import Transaction
|
|
from trytond.i18n import gettext
|
|
from trytond.exceptions import UserError
|
|
from trytond.wizard import StateTransition, StateView, Button
|
|
from genshi.template import TextTemplate
|
|
from io import open
|
|
import oyaml as yaml
|
|
import os
|
|
import tempfile
|
|
|
|
|
|
KNOWN_EXTENSIONS = ['.txt', '.edi', '.pla']
|
|
|
|
|
|
class EdocumentTemplate(ModelView, ModelSQL):
|
|
"""Electronic Document Template"""
|
|
__name__ = 'edocument.template'
|
|
|
|
message_type = fields.Selection([], 'Document', required=True)
|
|
party = fields.Many2One('party.party', 'Party')
|
|
template = fields.Text('Template')
|
|
encoding = fields.Selection([
|
|
('utf-8', 'Utf-8'),
|
|
('cp1252', 'ANSI'),
|
|
], 'Encoding', required=True)
|
|
|
|
@classmethod
|
|
def default_encoding(cls):
|
|
return 'utf-8'
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(EdocumentTemplate, cls).__setup__()
|
|
|
|
t = cls.__table__()
|
|
cls._sql_constraints = [
|
|
('document_party_exclude', Exclude(t,
|
|
(t.message_type, SqlEqual),
|
|
(Coalesce(t.party, -1), SqlEqual)),
|
|
'edocument_edifact.msg_edi_party_message_type')]
|
|
|
|
def get_rec_name(self, name):
|
|
return ' '.join([
|
|
self.message_type,
|
|
self.party and self.party.name or ''
|
|
])
|
|
|
|
|
|
class EdocumentMessage(ModelView, ModelSQL):
|
|
"""EDIFACT message"""
|
|
__name__ = 'edocument.message'
|
|
|
|
code = fields.Char('Code', readonly=True)
|
|
message = fields.Text('Message', readonly=True)
|
|
origin = fields.Reference('Origin', selection='get_origin', readonly=True)
|
|
|
|
def get_rec_name(self, name):
|
|
return 'EW' + '{0:0>6}'.format(self.code)
|
|
|
|
@classmethod
|
|
def get_origin(cls):
|
|
IrModel = Pool().get('ir.model')
|
|
models = cls._get_origin()
|
|
models = IrModel.search([
|
|
('model', 'in', models),
|
|
])
|
|
return [(None, '')] + [(m.model, m.name) for m in models]
|
|
|
|
@classmethod
|
|
def _get_origin(cls):
|
|
return []
|
|
|
|
@classmethod
|
|
def create(cls, vlist):
|
|
pool = Pool()
|
|
Configuration = pool.get('edocument.configuration')
|
|
config = Configuration(1)
|
|
|
|
default_company = Transaction().context.get('company', None)
|
|
vlist = [x.copy() for x in vlist]
|
|
for values in vlist:
|
|
if not values.get('code') and config.edocument_sequence:
|
|
values['code'] = config.get_multivalue(
|
|
'edocument_sequence',
|
|
company=values.get('company', default_company)).get()
|
|
return super().create(vlist)
|
|
|
|
|
|
class EdocumentMixin(object):
|
|
|
|
edi_messages = fields.One2Many('edocument.message', 'origin',
|
|
'EDI Messages')
|
|
edi_processed = fields.Function(
|
|
fields.Boolean('EDI Processed'), 'get_edi_processed',
|
|
searcher='search_edi_processed')
|
|
edi_message = fields.Function(
|
|
fields.Char('EDI Message'), 'get_edi_message')
|
|
|
|
def get_edi_processed(self, name=None):
|
|
return bool(self.edi_messages)
|
|
|
|
@classmethod
|
|
def search_edi_processed(cls, name, clause):
|
|
exist = {
|
|
True: [('edi_messages', '=', None)],
|
|
False: [('edi_messages', '!=', None)]
|
|
}
|
|
return exist[bool(clause[1] == '=') ^ bool(clause[2])]
|
|
|
|
def get_edi_message(self, name=None):
|
|
return self.edi_messages[-1].rec_name if self.edi_messages else ''
|
|
|
|
@classmethod
|
|
def copy(cls, records, default=None):
|
|
if default is None:
|
|
default = {}
|
|
else:
|
|
default = default.copy()
|
|
default['edi_messages'] = None
|
|
return super().copy(records, default=default)
|
|
|
|
@property
|
|
def edi_party(self):
|
|
return self.party
|
|
|
|
def _get_edi_message_key(self, message):
|
|
return message.rec_name
|
|
|
|
|
|
class EdocumentImportMixin(object):
|
|
_edi_message_type = ''
|
|
|
|
@classmethod
|
|
def _get_template_name(cls):
|
|
pass
|
|
|
|
@classmethod
|
|
def create_object_from_edi(cls, edi_content, template):
|
|
pass
|
|
|
|
@classmethod
|
|
def _postprocess(cls, objects):
|
|
return objects
|
|
|
|
@classmethod
|
|
def _template(cls):
|
|
template_path = cls._get_template_name()
|
|
with open(template_path, encoding='utf-8') as fp:
|
|
return yaml.load(fp.read(), Loader=yaml.Loader)
|
|
|
|
@classmethod
|
|
def _get_document_manager(cls):
|
|
return EdocumentFileManager()
|
|
|
|
@classmethod
|
|
def _create_objects_from_edi(cls, template=None):
|
|
"""
|
|
Get objects from edi files
|
|
"""
|
|
pool = Pool()
|
|
Configuration = pool.get('edocument.configuration.path')
|
|
ObjectModel = pool.get(cls.__name__)
|
|
Message = pool.get('edocument.message')
|
|
|
|
paths = Configuration._get_path(cls._edi_message_type)
|
|
source_uri, error_uri = paths.path, paths.error_path
|
|
|
|
if not template:
|
|
template = cls._template()
|
|
records = []
|
|
error_files = []
|
|
messages = []
|
|
to_delete = []
|
|
manager = cls._get_document_manager()
|
|
for name, info in manager._get_messages(source_uri, error_uri).items():
|
|
try:
|
|
record, errors = cls.create_object_from_edi(info, template)
|
|
except RuntimeError:
|
|
continue
|
|
else:
|
|
if errors:
|
|
error_files.append(manager._handle_errors(
|
|
name, errors, error_uri))
|
|
else:
|
|
if record:
|
|
message = Message(
|
|
code=os.path.basename(os.path.splitext(name)[0]),
|
|
message=info,
|
|
origin=record)
|
|
messages.append(message)
|
|
records.append(record)
|
|
to_delete.append(name)
|
|
|
|
ObjectModel.save(records)
|
|
if messages:
|
|
Message.save(messages)
|
|
|
|
records = cls._postprocess(records)
|
|
|
|
if to_delete:
|
|
manager._handle_imported(to_delete)
|
|
|
|
return records, error_files
|
|
|
|
@classmethod
|
|
def get_objects_from_edi(cls):
|
|
return cls._create_objects_from_edi()
|
|
|
|
|
|
class EdocumentFileManager(object):
|
|
''' Imports EDI objects from a folder '''
|
|
|
|
@classmethod
|
|
def _read_file(cls, filename):
|
|
with open(filename, 'r', encoding='cp1252') as file:
|
|
return file.read()
|
|
|
|
@classmethod
|
|
def _get_messages(cls, uri, error_uri):
|
|
files = [os.path.join(uri, file) for file in
|
|
os.listdir(uri) if os.path.isfile(os.path.join(uri, file))]
|
|
|
|
return {fname: cls._read_file(fname) for fname in files
|
|
if fname[-4:].lower() in KNOWN_EXTENSIONS}
|
|
|
|
@classmethod
|
|
def _handle_errors(cls, fname, errors, error_uri):
|
|
error_fname = os.path.join(
|
|
error_uri, 'error_{}.EDI'.format(
|
|
os.path.splitext(os.path.basename(fname))[0]))
|
|
with open(error_fname, 'w') as fp:
|
|
fp.write('\n'.join(errors))
|
|
return error_fname
|
|
|
|
@classmethod
|
|
def _handle_imported(cls, imported):
|
|
for file in imported:
|
|
os.remove(file)
|
|
|
|
|
|
class EDocumentImportResult(ModelView):
|
|
"""Import EDIFACT Result"""
|
|
__name__ = 'edifact_import.result'
|
|
|
|
error_file = fields.Binary('Errors', filename='error_filename',
|
|
readonly=True)
|
|
error_filename = fields.Char('Filename')
|
|
message = fields.Text('Message', readonly=True)
|
|
|
|
|
|
def import_edocument_mixin(model_name, result_field):
|
|
|
|
class ImportEDocumentMixin(object):
|
|
|
|
start = StateTransition()
|
|
errors = StateView('edifact_import.result',
|
|
'edocument_edifact.import_result_view_form', [
|
|
Button('OK', 'pre_open', 'tryton-ok', default=True),
|
|
])
|
|
pre_open = StateTransition()
|
|
|
|
@property
|
|
def result_records(self):
|
|
return getattr(self.errors, result_field, [])
|
|
|
|
@result_records.setter
|
|
def result_records(self, value):
|
|
setattr(self.errors, result_field, [s.id for s in (value or [])])
|
|
|
|
def transition_start(self):
|
|
pool = Pool()
|
|
ActiveModel = pool.get(model_name)
|
|
Configuration = pool.get('edocument.configuration.path')
|
|
|
|
paths = Configuration._get_path(ActiveModel._edi_message_type)
|
|
|
|
results, error_files = ActiveModel.get_objects_from_edi()
|
|
self.result_records = results
|
|
if error_files:
|
|
error_file = os.path.join(paths.error_path, 'error_file')
|
|
with open(error_file, 'w') as ef:
|
|
for file in error_files:
|
|
ef.write('%s:\n' % file.split('/')[-1].split('error_')[-1])
|
|
with open(file, 'r') as f:
|
|
for line in f:
|
|
ef.write('\t- %s' % line)
|
|
ef.write('\n')
|
|
with open(error_file, mode='r') as f:
|
|
self.errors.error_file = f.read()
|
|
return 'errors'
|
|
return 'pre_open'
|
|
|
|
def default_errors(self, fields):
|
|
return {
|
|
'error_file': self.errors.error_file,
|
|
'error_filename': 'EDI_error.txt',
|
|
'message': self.errors.error_file,
|
|
result_field: [r.id for r in self.result_records]
|
|
}
|
|
|
|
def transition_pre_open(self):
|
|
return 'open_'
|
|
|
|
def do_open_(self, action):
|
|
if not self.result_records:
|
|
return
|
|
action['pyson_domain'] = PYSONEncoder().encode([
|
|
('id', 'in', [s.id for s in self.result_records])
|
|
])
|
|
return action, {}
|
|
|
|
return ImportEDocumentMixin
|
|
|
|
|
|
class EdocumentExportMixin(object):
|
|
_message_type = ''
|
|
|
|
start = StateTransition()
|
|
params = StateView('edifact_export.params',
|
|
'edocument_edifact.export_params_view_form',
|
|
[
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('Export', 'show_file', 'tryton-forward')
|
|
])
|
|
show_file = StateView('edifact_export.file',
|
|
'edocument_edifact.export_file_view_form', [
|
|
Button('OK', 'end', 'tryton-ok')
|
|
])
|
|
|
|
def transition_start(self):
|
|
return 'params'
|
|
|
|
@classmethod
|
|
def _default_template_path_file(cls):
|
|
pass
|
|
|
|
def _default_template_path(self):
|
|
return os.path.join(os.path.dirname(
|
|
self._default_template_path_file()),
|
|
'template',
|
|
self._message_type.lower() + '.txt')
|
|
|
|
def _write_file(self, file_path, file, encoding='utf-8'):
|
|
with open(file_path, 'w+', encoding=encoding) as edi_file:
|
|
edi_file.write(file)
|
|
|
|
def _edi_template(self, party):
|
|
pool = Pool()
|
|
EdocumentTemplate = pool.get('edocument.template')
|
|
|
|
templates = EdocumentTemplate.search([
|
|
('message_type', '=', self._message_type),
|
|
('party', '=', party)])
|
|
if templates:
|
|
return templates[0].template, templates[0].encoding
|
|
else:
|
|
template_path = self._default_template_path()
|
|
with open(template_path) as file:
|
|
template = file.read()
|
|
return template, 'utf-8'
|
|
|
|
def generate_message(self, export_again=False):
|
|
pool = Pool()
|
|
ActiveModel = self.model
|
|
Message = pool.get('edocument.message')
|
|
Configuration = pool.get('edocument.configuration.path')
|
|
|
|
domain = [('id', 'in', Transaction().context['active_ids'])]
|
|
if not export_again:
|
|
domain.append(('edi_processed', '=', False))
|
|
|
|
records = ActiveModel.search(domain)
|
|
if not records:
|
|
return None, None
|
|
|
|
companies = set(x.company for x in records)
|
|
if len(companies) > 1:
|
|
raise UserError(gettext('edocument_edifact.msg_company_unique'))
|
|
|
|
company, = companies
|
|
edifact_sender = [x for x in company.party.identifiers
|
|
if x.type == 'EDI_sender']
|
|
if not edifact_sender:
|
|
raise UserError(gettext('edocument_edifact.msg_EDI_sender',
|
|
company=company.party.name))
|
|
parties = set(x.edi_party for x in records)
|
|
if len(parties) > 1:
|
|
raise UserError(
|
|
gettext('edocument_edifact.msg_party_receiver_unique'))
|
|
|
|
party, = parties
|
|
edifact_receiver = [x for x in party.identifiers
|
|
if x.type == 'EDI_receiver']
|
|
if not edifact_receiver:
|
|
raise UserError(gettext('edocument_edifact.msg_party_EDI_receiver',
|
|
party=party.name))
|
|
|
|
template, encoding = self._edi_template(party)
|
|
loader = TextTemplate(template)
|
|
for record in records:
|
|
message = Message()
|
|
message.origin = record
|
|
message.save()
|
|
data = {
|
|
"message": message,
|
|
"sender": edifact_sender[0].code,
|
|
"receiver": edifact_receiver[0].code,
|
|
"records": [[record._get_edi_message_key(message), record]],
|
|
"date": message.create_date.strftime("%y%m%d"),
|
|
"time": message.create_date.strftime("%H%M")
|
|
}
|
|
message.message = loader.generate(
|
|
data=data, items=[1, 2, 3]).render()
|
|
message.save()
|
|
|
|
conf = Configuration._get_path(self._message_type)
|
|
chars = ['.', ' ', '-', ':']
|
|
f_name = ''.join(
|
|
[c for c in str(datetime.now()) if c not in chars])
|
|
name = 'EDI' + f_name + ".edi"
|
|
file_name = os.path.join(conf.path, name)
|
|
self._write_file(file_name, message.message, encoding=encoding)
|
|
if not message.code:
|
|
raise UserError(gettext('edocument_edifact.msg_EDI_sequence'))
|
|
return message.message, self._message_type + message.code + '.EDI'
|
|
|
|
def default_show_file(self, fields):
|
|
if not Transaction().context['active_ids']:
|
|
return {'file': None, 'file_name': None}
|
|
|
|
message, edi_file_name = self.generate_message(
|
|
export_again=self.params.export_again)
|
|
|
|
if not message:
|
|
return {}
|
|
file_no, file_name = tempfile.mkstemp('.txt', 'tryton_')
|
|
with open(file_name, 'w+') as file_p:
|
|
file_p.write(message)
|
|
file_p.close()
|
|
with open(file_name, mode='r') as file_p:
|
|
self.show_file.file = file_p.read()
|
|
self.show_file.file_name = edi_file_name
|
|
return {'file': self.show_file.file, 'file_name': edi_file_name}
|
|
|
|
|
|
class EdocumentExportParams(ModelView):
|
|
'''EDIFACT Export Params'''
|
|
__name__ = 'edifact_export.params'
|
|
|
|
export_again = fields.Boolean('Export Again')
|
|
|
|
|
|
class EdocumentExportFile(ModelView):
|
|
'''EDIFACT Export File'''
|
|
__name__ = 'edifact_export.file'
|
|
|
|
file = fields.Binary('File', readonly=True)
|
|
file_name = fields.Char('File Name')
|