trytond-edocument_edifact/edocument.py

467 lines
15 KiB
Python

# The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from datetime import datetime
from trytond.model import ModelSQL, ModelView, fields, Exclude
from sql.conditionals import Coalesce
from sql.operators import Equal as SqlEqual
from trytond.pool import Pool
from trytond.pyson import PYSONEncoder
from trytond.transaction import Transaction
from trytond.i18n import gettext
from trytond.exceptions import UserError
from trytond.wizard import StateTransition, StateView, Button
from genshi.template import TextTemplate
from io import open
import oyaml as yaml
import os
import tempfile
KNOWN_EXTENSIONS = ['.txt', '.edi', '.pla']
class EdocumentTemplate(ModelView, ModelSQL):
"""Electronic Document Template"""
__name__ = 'edocument.template'
message_type = fields.Selection([], 'Document', required=True)
party = fields.Many2One('party.party', 'Party')
template = fields.Text('Template')
encoding = fields.Selection([
('utf-8', 'Utf-8'),
('cp1252', 'ANSI'),
], 'Encoding', required=True)
@classmethod
def default_encoding(cls):
return 'utf-8'
@classmethod
def __setup__(cls):
super(EdocumentTemplate, cls).__setup__()
t = cls.__table__()
cls._sql_constraints = [
('document_party_exclude', Exclude(t,
(t.message_type, SqlEqual),
(Coalesce(t.party, -1), SqlEqual)),
'edocument_edifact.msg_edi_party_message_type')]
def get_rec_name(self, name):
return ' '.join([
self.message_type,
self.party and self.party.name or ''
])
class EdocumentMessage(ModelView, ModelSQL):
"""EDIFACT message"""
__name__ = 'edocument.message'
code = fields.Char('Code', readonly=True)
message = fields.Text('Message', readonly=True)
origin = fields.Reference('Origin', selection='get_origin', readonly=True)
def get_rec_name(self, name):
return 'EW' + '{0:0>6}'.format(self.code)
@classmethod
def get_origin(cls):
IrModel = Pool().get('ir.model')
models = cls._get_origin()
models = IrModel.search([
('model', 'in', models),
])
return [(None, '')] + [(m.model, m.name) for m in models]
@classmethod
def _get_origin(cls):
return []
@classmethod
def create(cls, vlist):
pool = Pool()
Configuration = pool.get('edocument.configuration')
config = Configuration(1)
default_company = Transaction().context.get('company', None)
vlist = [x.copy() for x in vlist]
for values in vlist:
if not values.get('code') and config.edocument_sequence:
values['code'] = config.get_multivalue(
'edocument_sequence',
company=values.get('company', default_company)).get()
return super().create(vlist)
class EdocumentMixin(object):
edi_messages = fields.One2Many('edocument.message', 'origin',
'EDI Messages')
edi_processed = fields.Function(
fields.Boolean('EDI Processed'), 'get_edi_processed',
searcher='search_edi_processed')
edi_message = fields.Function(
fields.Char('EDI Message'), 'get_edi_message')
def get_edi_processed(self, name=None):
return bool(self.edi_messages)
@classmethod
def search_edi_processed(cls, name, clause):
exist = {
True: [('edi_messages', '=', None)],
False: [('edi_messages', '!=', None)]
}
return exist[bool(clause[1] == '=') ^ bool(clause[2])]
def get_edi_message(self, name=None):
return self.edi_messages[-1].rec_name if self.edi_messages else ''
@classmethod
def copy(cls, records, default=None):
if default is None:
default = {}
else:
default = default.copy()
default['edi_messages'] = None
return super().copy(records, default=default)
@property
def edi_party(self):
return self.party
def _get_edi_message_key(self, message):
return message.rec_name
class EdocumentImportMixin(object):
_edi_message_type = ''
@classmethod
def _get_template_name(cls):
pass
@classmethod
def create_object_from_edi(cls, edi_content, template):
pass
@classmethod
def _postprocess(cls, objects):
return objects
@classmethod
def _template(cls):
template_path = cls._get_template_name()
with open(template_path, encoding='utf-8') as fp:
return yaml.load(fp.read(), Loader=yaml.Loader)
@classmethod
def _get_document_manager(cls):
return EdocumentFileManager()
@classmethod
def _create_objects_from_edi(cls, template=None):
"""
Get objects from edi files
"""
pool = Pool()
Configuration = pool.get('edocument.configuration.path')
ObjectModel = pool.get(cls.__name__)
Message = pool.get('edocument.message')
paths = Configuration._get_path(cls._edi_message_type)
source_uri, error_uri = paths.path, paths.error_path
if not template:
template = cls._template()
records = []
error_files = []
messages = []
to_delete = []
manager = cls._get_document_manager()
for name, info in manager._get_messages(source_uri, error_uri).items():
try:
record, errors = cls.create_object_from_edi(info, template)
except RuntimeError:
continue
else:
if errors:
error_files.append(manager._handle_errors(
name, errors, error_uri))
else:
if record:
message = Message(
code=os.path.basename(os.path.splitext(name)[0]),
message=info,
origin=record)
messages.append(message)
records.append(record)
to_delete.append(name)
ObjectModel.save(records)
if messages:
Message.save(messages)
records = cls._postprocess(records)
if to_delete:
manager._handle_imported(to_delete)
return records, error_files
@classmethod
def get_objects_from_edi(cls):
return cls._create_objects_from_edi()
class EdocumentFileManager(object):
''' Imports EDI objects from a folder '''
@classmethod
def _read_file(cls, filename):
with open(filename, 'r', encoding='cp1252') as file:
return file.read()
@classmethod
def _get_messages(cls, uri, error_uri):
files = [os.path.join(uri, file) for file in
os.listdir(uri) if os.path.isfile(os.path.join(uri, file))]
return {fname: cls._read_file(fname) for fname in files
if fname[-4:].lower() in KNOWN_EXTENSIONS}
@classmethod
def _handle_errors(cls, fname, errors, error_uri):
error_fname = os.path.join(
error_uri, 'error_{}.EDI'.format(
os.path.splitext(os.path.basename(fname))[0]))
with open(error_fname, 'w') as fp:
fp.write('\n'.join(errors))
return error_fname
@classmethod
def _handle_imported(cls, imported):
for file in imported:
os.remove(file)
class EDocumentImportResult(ModelView):
"""Import EDIFACT Result"""
__name__ = 'edifact_import.result'
error_file = fields.Binary('Errors', filename='error_filename',
readonly=True)
error_filename = fields.Char('Filename')
message = fields.Text('Message', readonly=True)
def import_edocument_mixin(model_name, result_field):
class ImportEDocumentMixin(object):
start = StateTransition()
errors = StateView('edifact_import.result',
'edocument_edifact.import_result_view_form', [
Button('OK', 'pre_open', 'tryton-ok', default=True),
])
pre_open = StateTransition()
@property
def result_records(self):
return getattr(self.errors, result_field, [])
@result_records.setter
def result_records(self, value):
setattr(self.errors, result_field, [s.id for s in (value or [])])
def transition_start(self):
pool = Pool()
ActiveModel = pool.get(model_name)
Configuration = pool.get('edocument.configuration.path')
paths = Configuration._get_path(ActiveModel._edi_message_type)
results, error_files = ActiveModel.get_objects_from_edi()
self.result_records = results
if error_files:
error_file = os.path.join(paths.error_path, 'error_file')
with open(error_file, 'w') as ef:
for file in error_files:
ef.write('%s:\n' % file.split('/')[-1].split('error_')[-1])
with open(file, 'r') as f:
for line in f:
ef.write('\t- %s' % line)
ef.write('\n')
with open(error_file, mode='r') as f:
self.errors.error_file = f.read()
return 'errors'
return 'pre_open'
def default_errors(self, fields):
return {
'error_file': self.errors.error_file,
'error_filename': 'EDI_error.txt',
'message': self.errors.error_file,
result_field: [r.id for r in self.result_records]
}
def transition_pre_open(self):
return 'open_'
def do_open_(self, action):
if not self.result_records:
return
action['pyson_domain'] = PYSONEncoder().encode([
('id', 'in', [s.id for s in self.result_records])
])
return action, {}
return ImportEDocumentMixin
class EdocumentExportMixin(object):
_message_type = ''
start = StateTransition()
params = StateView('edifact_export.params',
'edocument_edifact.export_params_view_form',
[
Button('Cancel', 'end', 'tryton-cancel'),
Button('Export', 'show_file', 'tryton-forward')
])
show_file = StateView('edifact_export.file',
'edocument_edifact.export_file_view_form', [
Button('OK', 'end', 'tryton-ok')
])
def transition_start(self):
return 'params'
@classmethod
def _default_template_path_file(cls):
pass
def _default_template_path(self):
return os.path.join(os.path.dirname(
self._default_template_path_file()),
'template',
self._message_type.lower() + '.txt')
def _write_file(self, file_path, file, encoding='utf-8'):
with open(file_path, 'w+', encoding=encoding) as edi_file:
edi_file.write(file)
def _edi_template(self, party):
pool = Pool()
EdocumentTemplate = pool.get('edocument.template')
templates = EdocumentTemplate.search([
('message_type', '=', self._message_type),
('party', '=', party)])
if templates:
return templates[0].template, templates[0].encoding
else:
template_path = self._default_template_path()
with open(template_path) as file:
template = file.read()
return template, 'utf-8'
def generate_message(self, export_again=False):
pool = Pool()
ActiveModel = self.model
Message = pool.get('edocument.message')
Configuration = pool.get('edocument.configuration.path')
domain = [('id', 'in', Transaction().context['active_ids'])]
if not export_again:
domain.append(('edi_processed', '=', False))
records = ActiveModel.search(domain)
if not records:
return None, None
companies = set(x.company for x in records)
if len(companies) > 1:
raise UserError(gettext('edocument_edifact.msg_company_unique'))
company, = companies
edifact_sender = [x for x in company.party.identifiers
if x.type == 'EDI_sender']
if not edifact_sender:
raise UserError(gettext('edocument_edifact.msg_EDI_sender',
company=company.party.name))
parties = set(x.edi_party for x in records)
if len(parties) > 1:
raise UserError(
gettext('edocument_edifact.msg_party_receiver_unique'))
party, = parties
edifact_receiver = [x for x in party.identifiers
if x.type == 'EDI_receiver']
if not edifact_receiver:
raise UserError(gettext('edocument_edifact.msg_party_EDI_receiver',
party=party.name))
template, encoding = self._edi_template(party)
loader = TextTemplate(template)
for record in records:
message = Message()
message.origin = record
message.save()
data = {
"message": message,
"sender": edifact_sender[0].code,
"receiver": edifact_receiver[0].code,
"records": [[record._get_edi_message_key(message), record]],
"date": message.create_date.strftime("%y%m%d"),
"time": message.create_date.strftime("%H%M")
}
message.message = loader.generate(
data=data, items=[1, 2, 3]).render()
message.save()
conf = Configuration._get_path(self._message_type)
chars = ['.', ' ', '-', ':']
f_name = ''.join(
[c for c in str(datetime.now()) if c not in chars])
name = 'EDI' + f_name + ".edi"
file_name = os.path.join(conf.path, name)
self._write_file(file_name, message.message, encoding=encoding)
if not message.code:
raise UserError(gettext('edocument_edifact.msg_EDI_sequence'))
return message.message, self._message_type + message.code + '.EDI'
def default_show_file(self, fields):
if not Transaction().context['active_ids']:
return {'file': None, 'file_name': None}
message, edi_file_name = self.generate_message(
export_again=self.params.export_again)
if not message:
return {}
file_no, file_name = tempfile.mkstemp('.txt', 'tryton_')
with open(file_name, 'w+') as file_p:
file_p.write(message)
file_p.close()
with open(file_name, mode='r') as file_p:
self.show_file.file = file_p.read()
self.show_file.file_name = edi_file_name
return {'file': self.show_file.file, 'file_name': edi_file_name}
class EdocumentExportParams(ModelView):
'''EDIFACT Export Params'''
__name__ = 'edifact_export.params'
export_again = fields.Boolean('Export Again')
class EdocumentExportFile(ModelView):
'''EDIFACT Export File'''
__name__ = 'edifact_export.file'
file = fields.Binary('File', readonly=True)
file_name = fields.Char('File Name')