# The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. from datetime import datetime from trytond.model import ModelSQL, ModelView, fields, Exclude from sql.conditionals import Coalesce from sql.operators import Equal as SqlEqual from trytond.pool import Pool from trytond.pyson import PYSONEncoder from trytond.transaction import Transaction from trytond.i18n import gettext from trytond.exceptions import UserError from trytond.wizard import StateTransition, StateView, Button from genshi.template import TextTemplate from io import open import oyaml as yaml import os import tempfile KNOWN_EXTENSIONS = ['.txt', '.edi', '.pla'] class EdocumentTemplate(ModelView, ModelSQL): """Electronic Document Template""" __name__ = 'edocument.template' message_type = fields.Selection([], 'Document', required=True) party = fields.Many2One('party.party', 'Party') template = fields.Text('Template') encoding = fields.Selection([ ('utf-8', 'Utf-8'), ('cp1252', 'ANSI'), ], 'Encoding', required=True) @classmethod def default_encoding(cls): return 'utf-8' @classmethod def __setup__(cls): super(EdocumentTemplate, cls).__setup__() t = cls.__table__() cls._sql_constraints = [ ('document_party_exclude', Exclude(t, (t.message_type, SqlEqual), (Coalesce(t.party, -1), SqlEqual)), 'edocument_edifact.msg_edi_party_message_type')] def get_rec_name(self, name): return ' '.join([ self.message_type, self.party and self.party.name or '' ]) class EdocumentMessage(ModelView, ModelSQL): """EDIFACT message""" __name__ = 'edocument.message' code = fields.Char('Code', readonly=True) message = fields.Text('Message', readonly=True) origin = fields.Reference('Origin', selection='get_origin', readonly=True) def get_rec_name(self, name): return 'EW' + '{0:0>6}'.format(self.code) @classmethod def get_origin(cls): IrModel = Pool().get('ir.model') models = cls._get_origin() models = IrModel.search([ ('model', 'in', models), ]) return [(None, '')] + [(m.model, m.name) for m in models] @classmethod def _get_origin(cls): return [] @classmethod def create(cls, vlist): pool = Pool() Configuration = pool.get('edocument.configuration') config = Configuration(1) default_company = Transaction().context.get('company', None) vlist = [x.copy() for x in vlist] for values in vlist: if not values.get('code') and config.edocument_sequence: values['code'] = config.get_multivalue( 'edocument_sequence', company=values.get('company', default_company)).get() return super().create(vlist) class EdocumentMixin(object): edi_messages = fields.One2Many('edocument.message', 'origin', 'EDI Messages') edi_processed = fields.Function( fields.Boolean('EDI Processed'), 'get_edi_processed', searcher='search_edi_processed') edi_message = fields.Function( fields.Char('EDI Message'), 'get_edi_message') def get_edi_processed(self, name=None): return bool(self.edi_messages) @classmethod def search_edi_processed(cls, name, clause): exist = { True: [('edi_messages', '=', None)], False: [('edi_messages', '!=', None)] } return exist[bool(clause[1] == '=') ^ bool(clause[2])] def get_edi_message(self, name=None): return self.edi_messages[-1].rec_name if self.edi_messages else '' @classmethod def copy(cls, records, default=None): if default is None: default = {} else: default = default.copy() default['edi_messages'] = None return super().copy(records, default=default) @property def edi_party(self): return self.party def _get_edi_message_key(self, message): return message.rec_name class EdocumentImportMixin(object): _edi_message_type = '' @classmethod def _get_template_name(cls): pass @classmethod def create_object_from_edi(cls, edi_content, template): pass @classmethod def _postprocess(cls, objects): return objects @classmethod def _template(cls): template_path = cls._get_template_name() with open(template_path, encoding='utf-8') as fp: return yaml.load(fp.read(), Loader=yaml.Loader) @classmethod def _get_document_manager(cls): return EdocumentFileManager() @classmethod def _create_objects_from_edi(cls, template=None): """ Get objects from edi files """ pool = Pool() Configuration = pool.get('edocument.configuration.path') ObjectModel = pool.get(cls.__name__) Message = pool.get('edocument.message') paths = Configuration._get_path(cls._edi_message_type) source_uri, error_uri = paths.path, paths.error_path if not template: template = cls._template() records = [] error_files = [] messages = [] to_delete = [] manager = cls._get_document_manager() for name, info in manager._get_messages(source_uri, error_uri).items(): try: record, errors = cls.create_object_from_edi(info, template) except RuntimeError: continue else: if errors: error_files.append(manager._handle_errors( name, errors, error_uri)) else: if record: message = Message( code=os.path.basename(os.path.splitext(name)[0]), message=info, origin=record) messages.append(message) records.append(record) to_delete.append(name) ObjectModel.save(records) if messages: Message.save(messages) records = cls._postprocess(records) if to_delete: manager._handle_imported(to_delete) return records, error_files @classmethod def get_objects_from_edi(cls): return cls._create_objects_from_edi() class EdocumentFileManager(object): ''' Imports EDI objects from a folder ''' @classmethod def _read_file(cls, filename): with open(filename, 'r', encoding='cp1252') as file: return file.read() @classmethod def _get_messages(cls, uri, error_uri): files = [os.path.join(uri, file) for file in os.listdir(uri) if os.path.isfile(os.path.join(uri, file))] return {fname: cls._read_file(fname) for fname in files if fname[-4:].lower() in KNOWN_EXTENSIONS} @classmethod def _handle_errors(cls, fname, errors, error_uri): error_fname = os.path.join( error_uri, 'error_{}.EDI'.format( os.path.splitext(os.path.basename(fname))[0])) with open(error_fname, 'w') as fp: fp.write('\n'.join(errors)) return error_fname @classmethod def _handle_imported(cls, imported): for file in imported: os.remove(file) class EDocumentImportResult(ModelView): """Import EDIFACT Result""" __name__ = 'edifact_import.result' error_file = fields.Binary('Errors', filename='error_filename', readonly=True) error_filename = fields.Char('Filename') message = fields.Text('Message', readonly=True) def import_edocument_mixin(model_name, result_field): class ImportEDocumentMixin(object): start = StateTransition() errors = StateView('edifact_import.result', 'edocument_edifact.import_result_view_form', [ Button('OK', 'pre_open', 'tryton-ok', default=True), ]) pre_open = StateTransition() @property def result_records(self): return getattr(self.errors, result_field, []) @result_records.setter def result_records(self, value): setattr(self.errors, result_field, [s.id for s in (value or [])]) def transition_start(self): pool = Pool() ActiveModel = pool.get(model_name) Configuration = pool.get('edocument.configuration.path') paths = Configuration._get_path(ActiveModel._edi_message_type) results, error_files = ActiveModel.get_objects_from_edi() self.result_records = results if error_files: error_file = os.path.join(paths.error_path, 'error_file') with open(error_file, 'w') as ef: for file in error_files: ef.write('%s:\n' % file.split('/')[-1].split('error_')[-1]) with open(file, 'r') as f: for line in f: ef.write('\t- %s' % line) ef.write('\n') with open(error_file, mode='r') as f: self.errors.error_file = f.read() return 'errors' return 'pre_open' def default_errors(self, fields): return { 'error_file': self.errors.error_file, 'error_filename': 'EDI_error.txt', 'message': self.errors.error_file, result_field: [r.id for r in self.result_records] } def transition_pre_open(self): return 'open_' def do_open_(self, action): if not self.result_records: return action['pyson_domain'] = PYSONEncoder().encode([ ('id', 'in', [s.id for s in self.result_records]) ]) return action, {} return ImportEDocumentMixin class EdocumentExportMixin(object): _message_type = '' start = StateTransition() params = StateView('edifact_export.params', 'edocument_edifact.export_params_view_form', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Export', 'show_file', 'tryton-forward') ]) show_file = StateView('edifact_export.file', 'edocument_edifact.export_file_view_form', [ Button('OK', 'end', 'tryton-ok') ]) def transition_start(self): return 'params' @classmethod def _default_template_path_file(cls): pass def _default_template_path(self): return os.path.join(os.path.dirname( self._default_template_path_file()), 'template', self._message_type.lower() + '.txt') def _write_file(self, file_path, file, encoding='utf-8'): with open(file_path, 'w+', encoding=encoding) as edi_file: edi_file.write(file) def _edi_template(self, party): pool = Pool() EdocumentTemplate = pool.get('edocument.template') templates = EdocumentTemplate.search([ ('message_type', '=', self._message_type), ('party', '=', party)]) if templates: return templates[0].template, templates[0].encoding else: template_path = self._default_template_path() with open(template_path) as file: template = file.read() return template, 'utf-8' def generate_message(self, export_again=False): pool = Pool() ActiveModel = self.model Message = pool.get('edocument.message') Configuration = pool.get('edocument.configuration.path') domain = [('id', 'in', Transaction().context['active_ids'])] if not export_again: domain.append(('edi_processed', '=', False)) records = ActiveModel.search(domain) if not records: return None, None companies = set(x.company for x in records) if len(companies) > 1: raise UserError(gettext('edocument_edifact.msg_company_unique')) company, = companies edifact_sender = [x for x in company.party.identifiers if x.type == 'EDI_sender'] if not edifact_sender: raise UserError(gettext('edocument_edifact.msg_EDI_sender', company=company.party.name)) parties = set(x.edi_party for x in records) if len(parties) > 1: raise UserError( gettext('edocument_edifact.msg_party_receiver_unique')) party, = parties edifact_receiver = [x for x in party.identifiers if x.type == 'EDI_receiver'] if not edifact_receiver: raise UserError(gettext('edocument_edifact.msg_party_EDI_receiver', party=party.name)) template, encoding = self._edi_template(party) loader = TextTemplate(template) for record in records: message = Message() message.origin = record message.save() data = { "message": message, "sender": edifact_sender[0].code, "receiver": edifact_receiver[0].code, "records": [[record._get_edi_message_key(message), record]], "date": message.create_date.strftime("%y%m%d"), "time": message.create_date.strftime("%H%M") } message.message = loader.generate( data=data, items=[1, 2, 3]).render() message.save() conf = Configuration._get_path(self._message_type) chars = ['.', ' ', '-', ':'] f_name = ''.join( [c for c in str(datetime.now()) if c not in chars]) name = 'EDI' + f_name + ".edi" file_name = os.path.join(conf.path, name) self._write_file(file_name, message.message, encoding=encoding) if not message.code: raise UserError(gettext('edocument_edifact.msg_EDI_sequence')) return message.message, self._message_type + message.code + '.EDI' def default_show_file(self, fields): if not Transaction().context['active_ids']: return {'file': None, 'file_name': None} message, edi_file_name = self.generate_message( export_again=self.params.export_again) if not message: return {} file_no, file_name = tempfile.mkstemp('.txt', 'tryton_') with open(file_name, 'w+') as file_p: file_p.write(message) file_p.close() with open(file_name, mode='r') as file_p: self.show_file.file = file_p.read() self.show_file.file_name = edi_file_name return {'file': self.show_file.file, 'file_name': edi_file_name} class EdocumentExportParams(ModelView): '''EDIFACT Export Params''' __name__ = 'edifact_export.params' export_again = fields.Boolean('Export Again') class EdocumentExportFile(ModelView): '''EDIFACT Export File''' __name__ = 'edifact_export.file' file = fields.Binary('File', readonly=True) file_name = fields.Char('File Name')