trytond-edocument_edifact/edocument.py

467 lines
15 KiB
Python
Raw Permalink Normal View History

2018-05-02 13:50:39 +02:00
# The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
2021-09-13 08:59:22 +02:00
from datetime import datetime
2019-08-28 16:37:06 +02:00
from trytond.model import ModelSQL, ModelView, fields, Exclude
from sql.conditionals import Coalesce
from sql.operators import Equal as SqlEqual
2018-05-08 09:19:02 +02:00
from trytond.pool import Pool
from trytond.pyson import PYSONEncoder
from trytond.transaction import Transaction
2022-05-18 17:13:02 +02:00
from trytond.i18n import gettext
from trytond.exceptions import UserError
from trytond.wizard import StateTransition, StateView, Button
2021-09-13 08:59:22 +02:00
from genshi.template import TextTemplate
2019-08-16 09:24:39 +02:00
from io import open
import oyaml as yaml
import os
import tempfile
2018-05-02 13:50:39 +02:00
2019-08-16 09:24:39 +02:00
KNOWN_EXTENSIONS = ['.txt', '.edi', '.pla']
2018-05-02 13:50:39 +02:00
2019-08-16 09:24:39 +02:00
2019-08-28 16:37:06 +02:00
class EdocumentTemplate(ModelView, ModelSQL):
"""Electronic Document Template"""
__name__ = 'edocument.template'
message_type = fields.Selection([], 'Document', required=True)
2019-08-28 16:37:06 +02:00
party = fields.Many2One('party.party', 'Party')
template = fields.Text('Template')
2021-03-29 13:30:03 +02:00
encoding = fields.Selection([
('utf-8', 'Utf-8'),
('cp1252', 'ANSI'),
], 'Encoding', required=True)
@classmethod
def default_encoding(cls):
return 'utf-8'
2019-08-28 16:37:06 +02:00
@classmethod
def __setup__(cls):
super(EdocumentTemplate, cls).__setup__()
t = cls.__table__()
cls._sql_constraints = [
('document_party_exclude', Exclude(t,
(t.message_type, SqlEqual),
(Coalesce(t.party, -1), SqlEqual)),
2022-05-18 17:13:02 +02:00
'edocument_edifact.msg_edi_party_message_type')]
2019-08-28 16:37:06 +02:00
def get_rec_name(self, name):
2022-05-18 17:13:02 +02:00
return ' '.join([
self.message_type,
self.party and self.party.name or ''
])
2019-08-28 16:37:06 +02:00
2019-08-16 09:24:39 +02:00
class EdocumentMessage(ModelView, ModelSQL):
2018-05-02 13:50:39 +02:00
"""EDIFACT message"""
__name__ = 'edocument.message'
code = fields.Char('Code', readonly=True)
message = fields.Text('Message', readonly=True)
origin = fields.Reference('Origin', selection='get_origin', readonly=True)
2019-09-25 18:41:33 +02:00
2019-10-28 17:40:17 +01:00
def get_rec_name(self, name):
return 'EW' + '{0:0>6}'.format(self.code)
2019-09-25 18:41:33 +02:00
@classmethod
def get_origin(cls):
IrModel = Pool().get('ir.model')
models = cls._get_origin()
models = IrModel.search([
('model', 'in', models),
])
return [(None, '')] + [(m.model, m.name) for m in models]
@classmethod
def _get_origin(cls):
return []
2018-05-02 13:50:39 +02:00
@classmethod
def create(cls, vlist):
pool = Pool()
2019-08-28 16:37:06 +02:00
Configuration = pool.get('edocument.configuration')
2018-05-02 13:50:39 +02:00
config = Configuration(1)
default_company = Transaction().context.get('company', None)
2018-05-02 13:50:39 +02:00
vlist = [x.copy() for x in vlist]
for values in vlist:
if not values.get('code') and config.edocument_sequence:
values['code'] = config.get_multivalue(
'edocument_sequence',
company=values.get('company', default_company)).get()
return super().create(vlist)
2019-08-16 09:24:39 +02:00
class EdocumentMixin(object):
2022-05-18 17:13:02 +02:00
edi_messages = fields.One2Many('edocument.message', 'origin',
'EDI Messages')
edi_processed = fields.Function(
fields.Boolean('EDI Processed'), 'get_edi_processed',
searcher='search_edi_processed')
2019-10-28 17:40:17 +01:00
edi_message = fields.Function(
2022-05-18 17:13:02 +02:00
fields.Char('EDI Message'), 'get_edi_message')
2019-10-28 17:40:17 +01:00
def get_edi_processed(self, name=None):
return bool(self.edi_messages)
2019-09-25 18:41:33 +02:00
@classmethod
def search_edi_processed(cls, name, clause):
2019-09-25 18:41:33 +02:00
exist = {
True: [('edi_messages', '=', None)],
False: [('edi_messages', '!=', None)]
}
2019-09-25 18:41:33 +02:00
return exist[bool(clause[1] == '=') ^ bool(clause[2])]
2019-10-28 17:40:17 +01:00
def get_edi_message(self, name=None):
return self.edi_messages[-1].rec_name if self.edi_messages else ''
2019-10-28 17:40:17 +01:00
@classmethod
def copy(cls, records, default=None):
if default is None:
default = {}
else:
default = default.copy()
default['edi_messages'] = None
return super().copy(records, default=default)
@property
def edi_party(self):
return self.party
def _get_edi_message_key(self, message):
return message.rec_name
2019-09-25 18:41:33 +02:00
class EdocumentImportMixin(object):
_edi_message_type = ''
2019-08-16 09:24:39 +02:00
@classmethod
def _get_template_name(cls):
pass
@classmethod
def create_object_from_edi(cls, edi_content, template):
2019-08-16 09:24:39 +02:00
pass
@classmethod
def _postprocess(cls, objects):
return objects
2019-08-16 09:24:39 +02:00
@classmethod
def _template(cls):
template_path = cls._get_template_name()
with open(template_path, encoding='utf-8') as fp:
return yaml.load(fp.read(), Loader=yaml.Loader)
2019-08-16 09:24:39 +02:00
@classmethod
def _get_document_manager(cls):
return EdocumentFileManager()
@classmethod
def _create_objects_from_edi(cls, template=None):
2019-08-16 09:24:39 +02:00
"""
Get objects from edi files
"""
pool = Pool()
Configuration = pool.get('edocument.configuration.path')
ObjectModel = pool.get(cls.__name__)
2019-08-16 09:24:39 +02:00
Message = pool.get('edocument.message')
paths = Configuration._get_path(cls._edi_message_type)
source_uri, error_uri = paths.path, paths.error_path
2019-08-16 09:24:39 +02:00
if not template:
template = cls._template()
records = []
error_files = []
2019-08-16 09:24:39 +02:00
messages = []
to_delete = []
2019-08-16 09:24:39 +02:00
manager = cls._get_document_manager()
for name, info in manager._get_messages(source_uri, error_uri).items():
try:
record, errors = cls.create_object_from_edi(info, template)
except RuntimeError:
continue
else:
if errors:
error_files.append(manager._handle_errors(
name, errors, error_uri))
else:
if record:
message = Message(
code=os.path.basename(os.path.splitext(name)[0]),
message=info,
origin=record)
messages.append(message)
records.append(record)
to_delete.append(name)
ObjectModel.save(records)
if messages:
Message.save(messages)
records = cls._postprocess(records)
if to_delete:
manager._handle_imported(to_delete)
return records, error_files
2019-08-16 09:24:39 +02:00
@classmethod
def get_objects_from_edi(cls):
return cls._create_objects_from_edi()
2019-08-16 09:24:39 +02:00
class EdocumentFileManager(object):
2019-08-16 09:24:39 +02:00
''' Imports EDI objects from a folder '''
@classmethod
def _read_file(cls, filename):
with open(filename, 'r', encoding='cp1252') as file:
2019-08-16 09:24:39 +02:00
return file.read()
@classmethod
def _get_messages(cls, uri, error_uri):
files = [os.path.join(uri, file) for file in
os.listdir(uri) if os.path.isfile(os.path.join(uri, file))]
return {fname: cls._read_file(fname) for fname in files
if fname[-4:].lower() in KNOWN_EXTENSIONS}
@classmethod
def _handle_errors(cls, fname, errors, error_uri):
2019-09-04 17:31:32 +02:00
error_fname = os.path.join(
error_uri, 'error_{}.EDI'.format(
os.path.splitext(os.path.basename(fname))[0]))
2019-08-16 09:24:39 +02:00
with open(error_fname, 'w') as fp:
fp.write('\n'.join(errors))
return error_fname
2019-08-16 09:24:39 +02:00
@classmethod
def _handle_imported(cls, imported):
for file in imported:
os.remove(file)
2021-09-13 08:59:22 +02:00
class EDocumentImportResult(ModelView):
"""Import EDIFACT Result"""
__name__ = 'edifact_import.result'
error_file = fields.Binary('Errors', filename='error_filename',
readonly=True)
error_filename = fields.Char('Filename')
message = fields.Text('Message', readonly=True)
def import_edocument_mixin(model_name, result_field):
class ImportEDocumentMixin(object):
start = StateTransition()
errors = StateView('edifact_import.result',
'edocument_edifact.import_result_view_form', [
Button('OK', 'pre_open', 'tryton-ok', default=True),
])
pre_open = StateTransition()
@property
def result_records(self):
return getattr(self.errors, result_field, [])
@result_records.setter
def result_records(self, value):
setattr(self.errors, result_field, [s.id for s in (value or [])])
def transition_start(self):
pool = Pool()
ActiveModel = pool.get(model_name)
Configuration = pool.get('edocument.configuration.path')
paths = Configuration._get_path(ActiveModel._edi_message_type)
results, error_files = ActiveModel.get_objects_from_edi()
self.result_records = results
if error_files:
error_file = os.path.join(paths.error_path, 'error_file')
with open(error_file, 'w') as ef:
for file in error_files:
ef.write('%s:\n' % file.split('/')[-1].split('error_')[-1])
with open(file, 'r') as f:
for line in f:
ef.write('\t- %s' % line)
ef.write('\n')
with open(error_file, mode='r') as f:
self.errors.error_file = f.read()
return 'errors'
return 'pre_open'
def default_errors(self, fields):
return {
'error_file': self.errors.error_file,
'error_filename': 'EDI_error.txt',
'message': self.errors.error_file,
result_field: [r.id for r in self.result_records]
}
def transition_pre_open(self):
return 'open_'
def do_open_(self, action):
if not self.result_records:
return
action['pyson_domain'] = PYSONEncoder().encode([
('id', 'in', [s.id for s in self.result_records])
])
return action, {}
return ImportEDocumentMixin
2021-09-13 08:59:22 +02:00
class EdocumentExportMixin(object):
_message_type = ''
start = StateTransition()
params = StateView('edifact_export.params',
'edocument_edifact.export_params_view_form',
[
Button('Cancel', 'end', 'tryton-cancel'),
Button('Export', 'show_file', 'tryton-forward')
])
show_file = StateView('edifact_export.file',
'edocument_edifact.export_file_view_form', [
Button('OK', 'end', 'tryton-ok')
])
def transition_start(self):
return 'params'
@classmethod
def _default_template_path_file(cls):
pass
2021-09-13 08:59:22 +02:00
def _default_template_path(self):
return os.path.join(os.path.dirname(
self._default_template_path_file()),
'template',
self._message_type.lower() + '.txt')
2021-09-13 08:59:22 +02:00
def _write_file(self, file_path, file, encoding='utf-8'):
with open(file_path, 'w+', encoding=encoding) as edi_file:
edi_file.write(file)
2021-09-13 08:59:22 +02:00
def _edi_template(self, party):
2021-09-13 08:59:22 +02:00
pool = Pool()
EdocumentTemplate = pool.get('edocument.template')
2022-05-18 17:13:02 +02:00
2021-09-13 08:59:22 +02:00
templates = EdocumentTemplate.search([
('message_type', '=', self._message_type),
2021-09-13 08:59:22 +02:00
('party', '=', party)])
if templates:
return templates[0].template, templates[0].encoding
else:
template_path = self._default_template_path()
2021-09-13 08:59:22 +02:00
with open(template_path) as file:
template = file.read()
return template, 'utf-8'
def generate_message(self, export_again=False):
2021-09-13 08:59:22 +02:00
pool = Pool()
2022-05-18 17:13:02 +02:00
ActiveModel = self.model
2021-09-13 08:59:22 +02:00
Message = pool.get('edocument.message')
Configuration = pool.get('edocument.configuration.path')
2022-05-18 17:13:02 +02:00
2021-09-13 08:59:22 +02:00
domain = [('id', 'in', Transaction().context['active_ids'])]
if not export_again:
domain.append(('edi_processed', '=', False))
2021-09-13 08:59:22 +02:00
records = ActiveModel.search(domain)
if not records:
return None, None
2021-09-13 08:59:22 +02:00
companies = set(x.company for x in records)
if len(companies) > 1:
2022-05-18 17:13:02 +02:00
raise UserError(gettext('edocument_edifact.msg_company_unique'))
2021-09-13 08:59:22 +02:00
company, = companies
edifact_sender = [x for x in company.party.identifiers
if x.type == 'EDI_sender']
if not edifact_sender:
2022-05-18 17:13:02 +02:00
raise UserError(gettext('edocument_edifact.msg_EDI_sender',
company=company.party.name))
parties = set(x.edi_party for x in records)
if len(parties) > 1:
raise UserError(
gettext('edocument_edifact.msg_party_receiver_unique'))
2021-09-13 08:59:22 +02:00
party, = parties
edifact_receiver = [x for x in party.identifiers
if x.type == 'EDI_receiver']
if not edifact_receiver:
raise UserError(gettext('edocument_edifact.msg_party_EDI_receiver',
2022-05-18 17:13:02 +02:00
party=party.name))
template, encoding = self._edi_template(party)
2021-09-13 08:59:22 +02:00
loader = TextTemplate(template)
for record in records:
message = Message()
message.origin = record
message.save()
data = {
"message": message,
"sender": edifact_sender[0].code,
"receiver": edifact_receiver[0].code,
"records": [[record._get_edi_message_key(message), record]],
2021-09-13 08:59:22 +02:00
"date": message.create_date.strftime("%y%m%d"),
"time": message.create_date.strftime("%H%M")
}
message.message = loader.generate(
data=data, items=[1, 2, 3]).render()
message.save()
conf = Configuration._get_path(self._message_type)
chars = ['.', ' ', '-', ':']
f_name = ''.join(
[c for c in str(datetime.now()) if c not in chars])
name = 'EDI' + f_name + ".edi"
file_name = os.path.join(conf.path, name)
self._write_file(file_name, message.message, encoding=encoding)
2021-09-13 08:59:22 +02:00
if not message.code:
2022-05-18 17:13:02 +02:00
raise UserError(gettext('edocument_edifact.msg_EDI_sequence'))
return message.message, self._message_type + message.code + '.EDI'
def default_show_file(self, fields):
if not Transaction().context['active_ids']:
return {'file': None, 'file_name': None}
message, edi_file_name = self.generate_message(
export_again=self.params.export_again)
if not message:
return {}
file_no, file_name = tempfile.mkstemp('.txt', 'tryton_')
with open(file_name, 'w+') as file_p:
file_p.write(message)
file_p.close()
with open(file_name, mode='r') as file_p:
self.show_file.file = file_p.read()
self.show_file.file_name = edi_file_name
return {'file': self.show_file.file, 'file_name': edi_file_name}
class EdocumentExportParams(ModelView):
'''EDIFACT Export Params'''
__name__ = 'edifact_export.params'
export_again = fields.Boolean('Export Again')
class EdocumentExportFile(ModelView):
'''EDIFACT Export File'''
__name__ = 'edifact_export.file'
file = fields.Binary('File', readonly=True)
file_name = fields.Char('File Name')