trytond-edocument_edifact/edocument.py

318 lines
10 KiB
Python

# The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from datetime import datetime
from trytond.model import ModelSQL, ModelView, fields, Exclude
from sql.conditionals import Coalesce
from sql.operators import Equal as SqlEqual
from trytond.pool import Pool
from trytond.transaction import Transaction
from trytond.i18n import gettext
from trytond.exceptions import UserError
from genshi.template import TextTemplate
from io import open
import oyaml as yaml
import os
KNOWN_EXTENSIONS = ['.txt', '.edi', '.pla']
class EdocumentTemplate(ModelView, ModelSQL):
"""Electronic Document Template"""
__name__ = 'edocument.template'
message_type = fields.Selection([('DESADV', 'DESADV'),
('INVOIC', 'INVOIC')], 'Document', required=True)
party = fields.Many2One('party.party', 'Party')
template = fields.Text('Template')
encoding = fields.Selection([
('utf-8', 'Utf-8'),
('cp1252', 'ANSI'),
], 'Encoding', required=True)
@classmethod
def default_encoding(cls):
return 'utf-8'
@classmethod
def __setup__(cls):
super(EdocumentTemplate, cls).__setup__()
t = cls.__table__()
cls._sql_constraints = [
('document_party_exclude', Exclude(t,
(t.message_type, SqlEqual),
(Coalesce(t.party, -1), SqlEqual)),
'edocument_edifact.msg_edi_party_message_type')]
def get_rec_name(self, name):
return ' '.join([
self.message_type,
self.party and self.party.name or ''
])
class EdocumentMessage(ModelView, ModelSQL):
"""EDIFACT message"""
__name__ = 'edocument.message'
code = fields.Char('Code')
message = fields.Text('Message')
sequence = fields.Many2One('ir.sequence', 'Sequence')
origin = fields.Reference('Origin', selection='get_origin')
def get_rec_name(self, name):
return 'EW' + '{0:0>6}'.format(self.code)
@classmethod
def get_origin(cls):
IrModel = Pool().get('ir.model')
models = cls._get_origin()
models = IrModel.search([
('model', 'in', models),
])
return [(None, '')] + [(m.model, m.name) for m in models]
@classmethod
def _get_origin(cls):
return []
@classmethod
def create(cls, vlist):
pool = Pool()
Configuration = pool.get('edocument.configuration')
config = Configuration(1)
default_company = Transaction().context.get('company', None)
vlist = [x.copy() for x in vlist]
for values in vlist:
if config.edocument_sequence:
values['code'] = config.get_multivalue(
'edocument_sequence',
company=values.get('company', default_company)).get()
return super(EdocumentMessage, cls).create(vlist)
class EdocumentMixin(object):
messages = fields.One2Many('edocument.message', 'origin', 'Message')
edocument_processed = fields.Function(
fields.Boolean('Processed'), 'get_edocument_processed',
searcher='search_edocument_processed')
edi_message = fields.Function(
fields.Char('EDI Message'), 'get_edi_message')
def get_edocument_processed(self, name=None):
return len(self.messages) > 0
@classmethod
def search_edocument_processed(cls, name, clause):
exist = {
True: [('messages', '=', None)],
False: [('messages', '!=', None)]}
return exist[bool(clause[1] == '=') ^ bool(clause[2])]
def get_edi_message(self, name=None):
return self.messages[-1].rec_name if self.messages else ''
class EdocumentImportMixin(EdocumentMixin):
@classmethod
def _get_template_name(cls):
pass
@classmethod
def create_from_edi(cls, edi_info, template):
pass
@classmethod
def postprocess(cls, objects):
pass
@classmethod
def _source_uri(cls):
pool = Pool()
Configuration = pool.get('edocument.configuration')
config = Configuration(0)
return config.import_path
@classmethod
def _error_uri(cls):
pool = Pool()
Configuration = pool.get('edocument.configuration')
config = Configuration(0)
return config.error_path
@classmethod
def _template(cls):
template_path = cls._get_template_name()
with open(template_path, encoding='utf-8') as fp:
return yaml.load(fp.read())
@classmethod
def _get_document_manager(cls):
return EdocumentFileManager()
@classmethod
def _create_objects_from_edi(
cls, source_uri, error_uri, template=None, target_model=None):
"""
Get objects from edi files
"""
pool = Pool()
if not target_model:
target_model = cls.__name__
Object = pool.get(target_model)
Message = pool.get('edocument.message')
if not template:
template = cls._template()
objects = []
messages = []
imported = []
manager = cls._get_document_manager()
for name, info in manager._get_messages(source_uri, error_uri).items():
created, errors = cls.create_from_edi(info, template)
if created:
message = Message(code=info)
messages.append(message)
imported.append(name)
for value in created:
if 'id' in value:
new_object = Object(value['id'])
else:
new_object = Object()
for k, v in value.items():
setattr(new_object, k, v)
new_object.message = message
message.origin = new_object
objects.append(new_object)
if imported:
manager._handle_imported(imported)
if errors:
manager._handle_errors(name, errors, error_uri)
Message.save(messages)
Object.save(objects)
return objects
@classmethod
def get_objects_from_edi(cls, target_model=None):
'''Get objects from edi'''
results = cls._create_objects_from_edi(
cls._source_uri(),
cls._error_uri(),
target_model=target_model)
cls.postprocess(results)
return results
class EdocumentFileManager(object):
''' Imports EDI objects from a folder '''
@classmethod
def _read_file(cls, filename):
with open(filename, encoding='utf-8') as file:
return file.read()
@classmethod
def _get_messages(cls, uri, error_uri):
files = [os.path.join(uri, file) for file in
os.listdir(uri) if os.path.isfile(os.path.join(uri, file))]
return {fname: cls._read_file(fname) for fname in files
if fname[-4:].lower() in KNOWN_EXTENSIONS}
@classmethod
def _handle_errors(cls, fname, errors, error_uri):
error_fname = os.path.join(
error_uri, 'error_{}.EDI'.format(
os.path.splitext(os.path.basename(fname))[0]))
with open(error_fname, 'w') as fp:
fp.write('\n'.join(errors))
@classmethod
def _handle_imported(cls, imported):
for file in imported:
os.remove(file)
class EdocumentExportMixin(object):
def _write_file(self, file_path, file, encoding='utf-8'):
edi_file = open(file_path, 'w+', encoding=encoding)
edi_file.write(file)
edi_file.close()
def _edi_template(self, message_type, party):
pool = Pool()
EdocumentTemplate = pool.get('edocument.template')
templates = EdocumentTemplate.search([
('message_type', '=', message_type),
('party', '=', party)])
if templates:
return templates[0].template, templates[0].encoding
else:
template_path = os.path.join(os.path.dirname(__file__),
'template', message_type.lower() + '.txt')
with open(template_path) as file:
template = file.read()
return template, 'utf-8'
def generate_message(self, message_type, export_again=False):
pool = Pool()
ActiveModel = self.model
Message = pool.get('edocument.message')
domain = [('id', 'in', Transaction().context['active_ids'])]
if not export_again:
domain.append(('edocument_processed', '=', False))
records = ActiveModel.search(domain)
if not records:
return None, None
companies = set(x.company for x in records)
if len(companies) > 1:
raise UserError(gettext('edocument_edifact.msg_company_unique'))
company, = companies
edifact_sender = [x for x in company.party.identifiers
if x.type == 'EDI_sender']
if not edifact_sender:
raise UserError(gettext('edocument_edifact.msg_EDI_sender',
company=company.party.name))
parties = set(x.party for x in records)
party, = parties
edifact_receiver = [x for x in party.identifiers
if x.type == 'EDI_receiver']
if not edifact_receiver:
raise UserError(gettext('edocument_edifact.msg_party_EDI_receiver',
party=party.name))
template, encoding = self._edi_template(message_type, party)
loader = TextTemplate(template)
for record in records:
message = Message()
message.origin = record
message.save()
data = {
"message": message,
"sender": edifact_sender[0].code,
"receiver": edifact_receiver[0].code,
"records": [[message.code, record]],
"date": message.create_date.strftime("%y%m%d"),
"time": message.create_date.strftime("%H%M")
}
message.message = loader.generate(
data=data, items=[1, 2, 3]).render()
message.save()
record.save()
conf = Pool().get('edocument.configuration')(0)
if conf.export_path:
chars = ['.', ' ', '-', ':']
f_name = ''.join(
[c for c in str(datetime.now()) if c not in chars])
name = 'EDI' + f_name + ".edi"
file_name = os.path.join(conf.export_path, name)
self._write_file(file_name, message.message, encoding=encoding)
if not message.code:
raise UserError(gettext('edocument_edifact.msg_EDI_sequence'))
return message.message, message_type + message.code + '.EDI'