# -*- coding: utf-8 -*- # This file is part of lims_account_invoice module for Tryton. # The COPYRIGHT file at the top level of this repository contains # the full copyright notices and license terms. from datetime import datetime from email import encoders from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from time import time from sql import Literal from sql.functions import Substring, Position import logging from trytond.model import Workflow, ModelView, ModelSQL, fields from trytond.wizard import Wizard, StateTransition, StateView, Button from trytond.pool import Pool, PoolMeta from trytond.pyson import Eval, Bool, Or, If from trytond.transaction import Transaction from trytond.tools import get_smtp_server from trytond.config import config from trytond.exceptions import UserError from trytond.i18n import gettext logger = logging.getLogger(__name__) class Invoice(metaclass=PoolMeta): __name__ = 'account.invoice' no_send_invoice = fields.Boolean('No send invoice', states={'invisible': Eval('type') == 'in'}, depends=['type'], help='If checked, then the invoice will not be mailed to contacts.') invoice_contacts = fields.One2Many('account.invoice.invoice_contacts', 'invoice', 'Invoice contacts', states={'invisible': Eval('type') == 'in'}, depends=['type']) sent = fields.Boolean('Sent', readonly=True, help='If checked, then the invoice was mailed to contacts.') sent_date = fields.DateTime('Sent date', readonly=True, states={'invisible': Bool(Eval('no_send_invoice'))}, depends=['no_send_invoice']) entries_comments = fields.Text('Entries comments', states={'invisible': Eval('type') == 'in'}, depends=['type'], readonly=True) @classmethod def __setup__(cls): super().__setup__() cls._check_modify_exclude.update({'sent', 'sent_date', 'invoice_contacts', 'no_send_invoice'}) @classmethod def view_attributes(cls): return super().view_attributes() + [ ('/form/notebook/page[@id="contacts"]', 'states', { 'invisible': Eval('type') == 'in', }), ] @fields.depends('party') def on_change_party(self): super().on_change_party() self.no_send_invoice = False if self.party: self.no_send_invoice = self.party.no_send_invoice def _credit(self, **values): credit = super()._credit(**values) if self.invoice_contacts: credit.invoice_contacts = [contact._credit() for contact in self.invoice_contacts] credit.no_send_invoice = self.no_send_invoice return credit @classmethod @ModelView.button @Workflow.transition('posted') def post(cls, invoices): cls.check_invoice_contacts(invoices) super().post(invoices) @classmethod def check_invoice_contacts(cls, invoices): for invoice in invoices: if invoice.type == 'out': if (not invoice.no_send_invoice and not invoice.invoice_contacts): raise UserError(gettext( 'lims_account_invoice.msg_not_invoice_contacts')) @classmethod def cron_send_invoice(cls): ''' Cron - Send Of Invoice ''' logger.info('Cron - Send Of Invoice:INIT') t1 = time() # DEBUG pool = Pool() SendOfInvoice = pool.get('account.invoice.send_invoice', type='wizard') invoices = cls.search([ ('type', '=', 'out'), ('sent', '=', False), ('no_send_invoice', '=', False), ('state', 'in', ['posted', 'paid']), ]) logger.info('Cron - Send Of Invoice:Se procesaran %s facturas...', len(invoices)) if invoices: session_id, _, _ = SendOfInvoice.create() send_of_invoice = SendOfInvoice(session_id) with Transaction().set_context(active_ids=[invoice.id for invoice in invoices]): send_of_invoice.transition_start() tt = round(time() - t1, 2) # DEBUG logger.info('Cron - Send Of Invoice:END:Finalizado en %s segundos.', tt) def mail_send_invoice(self): if not self.invoice_report_cache: return from_addr = config.get('email', 'from') to_addrs = [c.contact.email for c in self.invoice_contacts] if not (from_addr and to_addrs): logger.warn('mail_send_invoice():Factura %s:Envio omitido ' 'por no contener contactos.', self.number) # DEBUG return logger.info('mail_send_invoice():INFO:Contactos de factura:' 'emails:(%s)', ','.join(to_addrs)) # DEBUG subject, body = self.subject_body() attachments_data = self.attachment() msg = self.create_msg(from_addr, to_addrs, subject, body, attachments_data) return self.send_msg(from_addr, to_addrs, msg) def subject_body(self): pool = Pool() Config = pool.get('lims.configuration') User = pool.get('res.user') Lang = pool.get('ir.lang') config = Config(1) lang = User(Transaction().user).language if not lang: lang, = Lang.search([ ('code', '=', 'en'), ], limit=1) with Transaction().set_context(language=lang.code): subject = str('%s %s' % (config.mail_send_invoice_subject, self.number)).strip() body = str(config.mail_send_invoice_body) return subject, body def attachment(self): data = [] data.append({ 'content': self.invoice_report_cache, 'format': self.invoice_report_format, 'mimetype': self.invoice_report_format == 'pdf' and 'pdf' or 'vnd.oasis.opendocument.text', 'filename': str(self.number) + '.' + str(self.invoice_report_format), 'name': str(self.number), }) if self.invoice_service_report_cache: data.append({ 'content': self.invoice_service_report_cache, 'format': self.invoice_service_report_format, 'mimetype': self.invoice_service_report_format == 'pdf' and 'pdf' or 'vnd.oasis.opendocument.text', 'filename': str(self.number) + ' (II).' + str(self.invoice_report_format), 'name': str(self.number) + ' (II)', }) return data def create_msg(self, from_addr, to_addrs, subject, body, attachments_data): if not to_addrs: return None msg = MIMEMultipart('mixed') msg['From'] = from_addr hidden = True # TODO: HARDCODE! if not hidden: msg['To'] = ', '.join(to_addrs) msg['Subject'] = subject msg_body = MIMEText('text', 'plain') msg_body.set_payload(body.encode('UTF-8'), 'UTF-8') msg.attach(msg_body) for attachment_data in attachments_data: attachment = MIMEBase('application', 'octet-stream') attachment.set_payload(attachment_data['content']) encoders.encode_base64(attachment) attachment.add_header('Content-Disposition', 'attachment', filename=attachment_data['filename']) msg.attach(attachment) return msg def send_msg(self, from_addr, to_addrs, msg): to_addrs = list(set(to_addrs)) success = False try: server = get_smtp_server() server.sendmail(from_addr, to_addrs, msg.as_string()) server.quit() success = True except Exception: logger.error('Unable to deliver mail for invoice %s', self.number) return success class InvoiceContact(ModelSQL, ModelView): 'Invoice Contact' __name__ = 'account.invoice.invoice_contacts' invoice = fields.Many2One('account.invoice', 'Invoice', ondelete='CASCADE', select=True, required=True) contact = fields.Many2One('party.address', 'Contact', required=True, domain=[('invoice_contact', '=', True)]) def _credit(self): credit = self.__class__() for field in ('invoice', 'contact'): setattr(credit, field, getattr(self, field)) return credit class InvoiceLine(metaclass=PoolMeta): __name__ = 'account.invoice.line' lims_service_party = fields.Function(fields.Many2One('party.party', 'Party', depends=['invoice_type'], states={ 'invisible': Or(Eval('_parent_invoice', {}).get('type') == 'in', Eval('invoice_type') == 'in'), }), 'get_fraction_field', searcher='search_fraction_field') lims_service_entry = fields.Function(fields.Many2One('lims.entry', 'Entry', depends=['invoice_type'], states={ 'invisible': Or(Eval('_parent_invoice', {}).get('type') == 'in', Eval('invoice_type') == 'in'), }), 'get_fraction_field', searcher='search_fraction_field') lims_service_sample = fields.Function(fields.Many2One('lims.sample', 'Sample', depends=['invoice_type'], states={ 'invisible': Or(Eval('_parent_invoice', {}).get('type') == 'in', Eval('invoice_type') == 'in'), }), 'get_fraction_field', searcher='search_fraction_field') lims_service_results_reports = fields.Function(fields.Char( 'Results Reports', depends=['invoice_type'], states={ 'invisible': Or(Eval('_parent_invoice', {}).get('type') == 'in', Eval('invoice_type') == 'in'), }), 'get_results_reports', searcher='search_results_reports') party_domain = fields.Function(fields.Many2Many('party.party', None, None, 'Party domain'), 'get_party_domain') @classmethod def __setup__(cls): super().__setup__() cls.origin.states['readonly'] = True cls.party.domain = ['OR', ('id', '=', Eval('party')), If(Bool(Eval('party_domain')), ('id', 'in', Eval('party_domain')), ('id', '!=', -1))] if 'party_domain' not in cls.party.depends: cls.party.depends.append('party_domain') cls.product.states['readonly'] = Or( Eval('invoice_state') != 'draft', Bool(Eval('lims_service_sample'))) cls.product.depends.append('lims_service_sample') @classmethod def delete(cls, lines): if not Transaction().context.get('delete_service', False): cls.check_service_invoice(lines) super().delete(lines) @classmethod def check_service_invoice(cls, lines): for line in lines: if (line.origin and line.origin.__name__ == 'lims.service' and (not hasattr(line, 'economic_offer') or not line.economic_offer)): raise UserError( gettext('lims_account_invoice.msg_delete_service_invoice', service=line.origin.rec_name)) @classmethod def get_fraction_field(cls, lines, names): result = {} for name in names: result[name] = {} for l in lines: if l.origin and l.origin.__name__ == 'lims.service': # name[13:]: remove 'lims_service_' from field name field = getattr(l.origin.fraction, name[13:], None) result[name][l.id] = field.id if field else None else: result[name][l.id] = None return result @classmethod def search_fraction_field(cls, name, clause): return [('origin.fraction.' + name[13:],) + tuple(clause[1:]) + ('lims.service',)] def _order_service_field(name): def order_field(tables): Service = Pool().get('lims.service') field = Service._fields[name] table, _ = tables[None] service_tables = tables.get('service') if service_tables is None: service = Service.__table__() service_tables = { None: (service, (table.origin.like('lims.service,%') & (Service.id.sql_cast(Substring(table.origin, Position(',', table.origin) + Literal(1))) == service.id))), } tables['service'] = service_tables return field.convert_order(name, service_tables, Service) return staticmethod(order_field) order_lims_service_entry = _order_service_field('entry') @classmethod def get_results_reports(cls, lines, name): pool = Pool() NotebookLine = pool.get('lims.notebook.line') result = {} for l in lines: reports = [] if l.origin and l.origin.__name__ == 'lims.service': notebook_lines = NotebookLine.search([ ('service', '=', l.origin.id), ('results_report', '!=', None), ], limit=1) if notebook_lines: reports = [nl.results_report.rec_name for nl in notebook_lines] if reports: result[l.id] = ', '.join([r for r in reports]) else: result[l.id] = None return result @classmethod def search_results_reports(cls, name, clause): cursor = Transaction().connection.cursor() pool = Pool() ResultsReport = pool.get('lims.results_report') NotebookLine = pool.get('lims.notebook.line') value = clause[2] cursor.execute('SELECT DISTINCT(nl.service) ' 'FROM "' + ResultsReport._table + '" r ' 'INNER JOIN "' + NotebookLine._table + '" nl ' 'ON nl.results_report = r.id ' 'WHERE r.number ILIKE %s', (value,)) services = [x[0] for x in cursor.fetchall()] if not services: return [('id', '=', -1)] services_ids = ['lims.service,' + str(s) for s in services] return [('origin', 'in', services_ids)] @classmethod def _get_origin(cls): models = super()._get_origin() models.append('lims.service') return models @fields.depends('origin') def get_party_domain(self, name=None): pool = Pool() Config = pool.get('lims.configuration') config_ = Config(1) parties = [] if self.origin and self.origin.__name__ == 'lims.service': party = self.origin.party parties.append(party.id) if config_.invoice_party_relation_type: parties.extend([r.to.id for r in party.relations if r.type == config_.invoice_party_relation_type]) return parties class PopulateInvoiceContactsStart(ModelView): 'Populate Invoice Contacts Start' __name__ = 'account.invoice.populate_invoice_contacts.start' class PopulateInvoiceContacts(Wizard): 'Populate Invoice Contacts' __name__ = 'account.invoice.populate_invoice_contacts' start = StateView('account.invoice.populate_invoice_contacts.start', 'lims_account_invoice.account_invoice_populate_invoice_contacts_start' '_view_form', [ Button('Cancel', 'end', 'tryton-cancel'), Button('Populate', 'populate', 'tryton-ok', default=True), ]) populate = StateTransition() def transition_populate(self): logger.info('transition_populate():INIT') t1 = time() # DEBUG pool = Pool() Invoice = pool.get('account.invoice') InvoiceLine = pool.get('account.invoice.line') InvoiceContacts = pool.get('account.invoice.invoice_contacts') EntryInvoiceContacts = pool.get('lims.entry.invoice_contacts') Entry = pool.get('lims.entry') invoice = Invoice(Transaction().context['active_id']) lines = InvoiceLine.search([ ('invoice', '=', invoice.id), ]) if not lines: logger.warn('transition_populate():La factura no ' 'posee lineas! (id: %s)', invoice.id) return 'end' entry_ids = list(set([l.lims_service_entry.id for l in lines if l.lims_service_entry])) if not entry_ids: logger.warn('transition_populate():La factura no ' 'posee lineas asociadas a partidas! (id: %s)', invoice.id) return 'end' # Set entries comments entries_comments = '' entries = Entry.search([('id', 'in', entry_ids)], order=[('id', 'ASC')]) for entry in entries: if not entry.invoice_comments: continue if entries_comments: entries_comments += '\n' entries_comments += '%s: %s' % (entry.number, entry.invoice_comments) invoice.entries_comments = entries_comments invoice.save() entry_invoice_contacts = EntryInvoiceContacts.search([ ('entry', 'in', entry_ids), ]) if not entry_invoice_contacts: logger.warn('transition_populate():Las partidas de ' 'las lineas de la factura, no poseen contactos de ' 'facturacion! (id: %s)', invoice.id) return 'end' contacts_entries = list(set([c.contact for c in entry_invoice_contacts])) contacts_invoice = list(set([c.contact for c in invoice.invoice_contacts])) to_create = [] for contact in contacts_entries: if contact not in contacts_invoice: invoice_contact = InvoiceContacts( invoice=invoice, contact=contact, ) to_create.append(invoice_contact) if not to_create: logger.info('transition_populate():WARN:No se encontraron ' 'nuevos contactos para agregar. (id: %s)', invoice.id) return 'end' InvoiceContacts.save(to_create) tt = round(time() - t1, 2) # DEBUG logger.info('transition_populate():END:Agregado(s) %d contacto(s) ' 'en %s segundos. (id: %s)', len(to_create), tt, invoice.id) return 'end' class SendOfInvoice(Wizard): 'Send Of Invoice' __name__ = 'account.invoice.send_invoice' start = StateTransition() def transition_start(self): logger.info('SendOfInvoice:transition_start():INIT') Invoice = Pool().get('account.invoice') clean_invoice_report_cache = False # TODO: HARDCODE! for active_id in Transaction().context['active_ids']: invoice = Invoice(active_id) if (invoice.type != 'out' or invoice.state not in {'posted', 'paid'}): continue if not invoice.no_send_invoice: if clean_invoice_report_cache: invoice.invoice_report_cache = None invoice.invoice_report_format = None invoice.save() logger.info('SendOfInvoice:transition_start():' 'Factura %s (id: %s)', invoice.number, invoice.id) invoice.print_invoice() invoice.print_invoice_service() if not invoice.mail_send_invoice(): logger.error('SendOfInvoice:transition_start():' 'Factura %s:Envio fallido!', invoice.number) continue logger.info('SendOfInvoice:transition_start():' 'Factura %s:Envio exitoso.', invoice.number) invoice.sent = True invoice.sent_date = datetime.now() invoice.save() logger.info('SendOfInvoice:transition_start():END') return 'end'