trytondo-account_invoice_facho/invoice.py

753 lines
29 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from trytond.model import fields, ModelView, ModelSQL
from trytond.pool import PoolMeta, Pool
from trytond.pyson import Eval, And, Or, If, Bool
from trytond.transaction import Transaction
from trytond.rpc import RPC
from trytond.exceptions import UserError
from trytond.config import config
from trytond.report import Report
from facho.fe import form, form_xml
from facho import fe
from facho.fe.client import dian
import io
import qrcode
import zipfile
import hashlib
import tempfile
from contextlib import contextmanager
from datetime import datetime, date
from dateutil import tz
Bogota = tz.gettz("America/Bogota")
class Cron(metaclass=PoolMeta):
__name__ = 'ir.cron'
@classmethod
def __setup__(cls):
super().__setup__()
cls.method.selection.extend([
('account.invoice|fe_delivery', 'FE Delivery')
])
class Invoice(metaclass=PoolMeta):
__name__ = 'account.invoice'
_states_readonly = {'readonly': Eval('state') != 'draft'}
# TODO adicionar atributo fe_identifier y permitir seleccion desde form
fe_delivery_state = fields.Selection([
('draft', 'Draft'),
('queued', 'Queued'), # local encola
('delivered', 'Delivered'), # remoto encola
('exception', 'Exception'), # local exception
('failure', 'Failure'), # remoto fallo
('done', 'Done') # remoto ok
], 'Delivery State', states=_states_readonly)
fe_delivery_trackid = fields.Char('Delivery TrackID',
states=_states_readonly)
fe_delivery_status_description = fields.Char('Status Description',
states=_states_readonly)
fe_delivery_error_message = fields.Text('Error Message',
states=_states_readonly)
fe_delivery_checked_at = fields.DateTime('Delivery Checked At',
states=_states_readonly)
fe_cufe = fields.Char('Cufe',
states=_states_readonly)
fe_qrcode = fields.Char('QR Code',
states=_states_readonly)
fe_qrcode_img = fields.Function(fields.Binary('QR'),
'get_fe_qrcode_img')
fe_states = fields.Char('States',
states=_states_readonly)
fe_states_icon = fields.Function(fields.Char('Fe States'),
'get_fe_states_icon')
fe_xml_file = fields.Binary('Fe XML File',
states=_states_readonly)
fe_operation_type = fields.Selection(
[
('10', 'Estándar'),
('20', 'Nota Crédito que referencia una factura electrónica.'),
('30', 'Nota Débito que referencia una factura electrónica.'),
],'Operation Type',
states={
'readonly': (Eval('state') != 'draft')
},
depends=['state'])
fe_document_reference = fields.Many2One('account.invoice',
'Document Reference',
domain=[('type', '=', Eval('type'))],
states={
'invisible': (Bool(~Eval('credit_note')))
},
depends=['type', 'fe_delivery_state', 'credit_note'])
del _states_readonly
@classmethod
def __setup__(cls):
super(Invoice, cls).__setup__()
cls._buttons.update({
'xml': {
'readonly': Eval('state').in_(['posted', 'paid']),
'depends' : ['state'],},
'xml_signed': {
'readonly': Eval('state').in_(['posted', 'paid']),
'depends' : ['state'],},
'fe_send': {
'readonly': Or(Eval('fe_delivery_state').in_(['done', 'exception']),
Eval('state').in_(['draft','validated'])),
'depends' : ['fe_delivery_state', 'state'],},
'fe_update_status': {
'readonly': Eval('fe_delivery_state').in_(['done', 'exception']),
'depends' : ['fe_delivery_state'],},
'fe_email': {
'readonly': ~Eval('fe_delivery_state').in_(['done']),
'depends' : ['fe_delivery_state'],}
})
@classmethod
def trigger(cls, records, trigger):
"Action function for the triggers"
for record in records:
if record.subtype.fe_document:
record.fe_delivery_state = 'queued'
record.save()
cls.fe_send([record])
else:
record.fe_delivery_state = 'exception'
record.fe_delivery_status_description = 'DOCUMENTO NO ELECTRÓNICO'
record.save()
@staticmethod
def default_fe_delivery_state():
return 'draft'
@staticmethod
def default_fe_states():
return None
@staticmethod
def default_fe_operation_type():
return '10'
@classmethod
def copy(cls, invoices, default=None):
if default is None:
default = {}
else:
default = default.copy()
default.setdefault('number', None)
default.setdefault('sequence')
default.setdefault('move', None)
default.setdefault('additional_moves', None)
default.setdefault('cancel_move', None)
default.setdefault('invoice_report_cache', None)
default.setdefault('invoice_report_cache_id', None)
default.setdefault('invoice_report_format', None)
default.setdefault('payment_lines', None)
default.setdefault('invoice_date', None)
default.setdefault('accounting_date', None)
default.setdefault('payment_term_date', None)
default.setdefault('lines_to_pay', None)
default.setdefault('fe_delivery_state', 'draft')
default.setdefault('fe_delivery_trackid', None)
default.setdefault('fe_delivery_status_description', None)
default.setdefault('fe_delivery_error_message', None)
default.setdefault('fe_delivery_checked_at', None)
default.setdefault('fe_cufe', None)
default.setdefault('fe_qrcode', None)
default.setdefault('fe_xml_file', None)
default.setdefault('fe_document_reference', None)
return super(Invoice, cls).copy(invoices, default=default)
@fields.depends('fe_qrcode')
def get_fe_qrcode_img(self, name):
qr_data = io.BytesIO()
if self.fe_qrcode:
qrcode_img = qrcode.make(self.fe_qrcode)
else:
qrcode_img = qrcode.make(None)
with qr_data as img:
qrcode_img.get_image().save(img, 'png')
img.seek(0)
return img.read()
def get_email_invoice_count(self, name):
pool = Pool()
Notification_Email = pool.get('notification.email')
Notification_Email_Log = pool.get('notification.email.log')
notification_email = Notification_Email.search(['rec_name', '=', 'Enviar Factura por correo'])
domain = [('resource', '=', self)]
return str(Notification_Email_Log.search_count(domain))
def get_fe_states_icon(self, name):
if self.state == 'posted' or self.state == 'paid':
if self.fe_delivery_state != 'done':
icon = 'facho-send-fail'
elif self.fe_delivery_state == 'done' and self.fe_email_count(name) == 0:
icon = 'facho-mail-fail'
else:
icon = 'facho-mail-ok'
else:
icon = 'facho-not-posted'
return icon
@fields.depends('fe_operation_type', 'credit_note')
def on_change_credit_note(self):
config = Pool().get('account_invoice_facho.configuration')(1)
if not self.credit_note:
self.fe_operation_type = '10'
else:
if not config.dian_fe_debit_note:
self.fe_operation_type = '20'
else:
self.fe_operation_type = '30'
@fields.depends('fe_operation_type', 'credit_note')
def on_change_fe_operation_type(self):
if self.fe_operation_type == '10':
self.credit_note = False
@fields.depends('fe_operation_type')
def type_code_xml(self, facho_invoice):
if self.type == 'out':
if self.fe_operation_type == '10':
xml = form_xml.DIANInvoiceXML(facho_invoice)
elif self.fe_operation_type == '20':
xml = form_xml.DIANCreditNoteXML(facho_invoice)
else:
xml = form_xml.DIANDebitNoteXML(facho_invoice)
elif self.type == 'in':
if self.fe_operation_type == '10':
xml = form_xml.DIANSupportDocumentXML(facho_invoice)
elif self.fe_operation_type == '20':
xml = form_xml.DIANSupportDocumentCreditNoteXML(facho_invoice)
return xml
def fe_email_count(self, name):
pool = Pool()
Notification_Email_Log = pool.get('notification.email.log')
domain = [('resource', '=', self)]
return Notification_Email_Log.search_count(domain)
@classmethod
def check_modify(cls, invoices):
'''
Check if the invoices can be modified
'''
return False
@classmethod
def validate(cls, invoices):
super().validate(invoices)
for invoice in invoices:
for line in invoice.lines:
for taxes, unit_price, quantity, date_time in line.taxable_lines:
for tax in taxes:
if tax.type != 'percentage':
raise UserError('Solo se soporta impuesto tipo porcentaje para producto')
facho_invoice = invoice.tofacho()
xml = form_xml.DIANInvoiceXML(facho_invoice)
def tofacho(self):
#Crear tipos de documentos electrónicos
if self.fe_operation_type in ['20','30']:
reference = form.InvoiceDocumentReference(
ident = self.fe_document_reference.number,
uuid = self.fe_document_reference.fe_cufe,
date = self.fe_document_reference.invoice_date)
operation_type = self.fe_operation_type
party_tax_code = self.party.identifiers[0].type
if self.type == "out":
if self.fe_operation_type == '10':
inv = form.Invoice('01')
elif self.fe_operation_type == '20':
inv = form.CreditNote(reference)
else:
inv = form.DebitNote(reference)
elif self.type == "in":
party_tax_code = '31'
if self.credit_note:
response = form.SupportDocumentCreditNoteResponse(
id = self.fe_document_reference.number,
code = '5',
description = self.description)
inv = form.SupportDocumentCreditNote(reference, response)
operation_type = '10'
else:
inv = form.SupportDocument('05')
inv.set_period(datetime.now(tz=Bogota), datetime.now(tz=Bogota))
inv.set_issue(datetime.now(tz=Bogota))
if self.number:
inv.set_ident(self.number)
else:
inv.set_ident("0000")
#Adicionar tipos de operación
inv.set_operation_type(operation_type)
company = form.Party(
legal_name = self.company.party.name,
name = self.company.party.full_name,
ident = form.PartyIdentification(
str(self.company.party.identifiers[0].code),
str(self.company.party.identifiers[0].check_digit),
str(self.company.party.identifiers[0].type)
),
responsability_code = self.tax_level_code(self.company.party.tax_level_code),
responsability_regime_code = "48",
organization_code = self.party.type_person,
email = self.company.party.email,
address = form.Address('',
self.company.party.addresses[0].street,
form.City('05001',
self.company.party.addresses[0].city),
form.Country(self.company.party.addresses[0].country.code,
self.company.party.addresses[0].country.name),
form.CountrySubentity('05',
self.company.party.addresses[0].subdivision.name),
form.PostalZone(self.company.party.addresses[0].postal_code),
),
)
party = form.Party(
legal_name = self.party.name,
name = self.party.full_name,
ident = form.PartyIdentification(
str(self.party.identifiers[0].code),
str(self.party.identifiers[0].check_digit),
str(party_tax_code)
),
responsability_code = self.tax_level_code(self.party.tax_level_code),
responsability_regime_code = "48",
organization_code = self.party.type_person,
email = self.party.email,
address = form.Address('',
self.party.addresses[0].street,
form.City('05001', self.party.addresses[0].city),
form.Country(self.party.addresses[0].country.code,
self.party.addresses[0].country.name),
form.CountrySubentity('05', self.party.addresses[0].subdivision.name),
form.PostalZone(self.party.addresses[0].postal_code),
),
)
if self.type == "in":
supplier = party
customer = company
elif self.type == "out":
supplier = company
customer = party
inv.set_supplier(supplier)
inv.set_customer(customer)
inv.set_payment_mean(form.PaymentMean(
id = '1',
code = '10',
due_at = datetime.now(tz=Bogota),
payment_id = '1'
))
for line in self.lines:
inv.add_invoice_line(line.tofacho())
inv.calculate()
return inv
def tax_level_code(self, tax_level):
tax_level_codes = ''
for codes in tax_level:
if len(tax_level_codes) == 0:
tax_level_codes = codes.code
else:
tax_level_codes += ';' + codes.code
return tax_level_codes
@contextmanager
def acquire_public_key(config=None):
if config is None:
config = Pool().get('account_invoice_facho.configuration')(1)
def create_temporary_file(content):
f = tempfile.NamedTemporaryFile()
f.write(content)
f.flush()
return f
file_public_key = create_temporary_file(config.dian_fe_public_key)
file_private_key = create_temporary_file(config.dian_fe_private_key)
file_certs = create_temporary_file(config.dian_fe_cert)
passphrase = config.dian_fe_cert_passpharase
try:
yield {'file_public_key': file_public_key.name,
'file_private_key': file_private_key.name,
'file_certs': file_certs.name,
'passphrase': passphrase}
finally:
file_public_key.close()
file_private_key.close()
file_certs.close()
def do_dian_request(self, request):
with self.acquire_public_key() as ctx:
client = dian.DianSignatureClient(ctx['file_private_key'],
ctx['file_public_key'],
ctx['passphrase'])
return client.request(request)
def _force_write(self, params, invoice):
params['fe_delivery_checked_at'] = datetime.now()
invoice.write([invoice], params)
def _dian_zip_io(self, filename, xml_invoice):
zipdata = io.BytesIO()
with fe.DianZIP(zipdata) as dianzip:
dianzip.add_invoice_xml(filename, xml_invoice)
zipdata.seek(0)
return zipdata
def _dian_xml_file_name(self, name):
m = hashlib.sha256()
m.update(name.encode('utf-8'))
filename = m.hexdigest()
return filename
def do_fe_delivery(self, facho_invoice, invoice):
config = Pool().get('account_invoice_facho.configuration')(1)
extensions = self.fe_extensions(invoice, facho_invoice)
xml = self.type_code_xml(invoice, facho_invoice)
for extension in extensions:
xml.add_extension(extension)
fe_qrcode = xml.get_element_text('./ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:QRCode')
fe_cufe = xml.get_element_text('./cbc:UUID')
with self.acquire_public_key(config) as ctx:
signer = fe.DianXMLExtensionSigner(ctx['file_certs'],
ctx['passphrase'],
localpolicy=True)
xml_signed = signer.sign_xml_string(str(xml))
req = dian.SendBillSync
filename = fe_cufe + '.xml'
filename_zip = self._dian_xml_file_name(self, fe_cufe) + '.zip'
zip_file = self._dian_zip_io(self, filename, xml_signed).read()
args = [filename_zip, zip_file]
if config.dian_fe_habilitation:
req = dian.Habilitacion.SendTestSetAsync
args.append(config.dian_fe_test_setid)
res = self.do_dian_request(self, req(*args))
delivery_trackid = 'n/a'
delivery_state = 'exception'
if config.dian_fe_habilitation:
if not res.ZipKey:
delivery_trackid = "[]"
status_description = "[]"
error_message = "[]"
fe_qrcode = None
delivery_state = "failure"
else:
delivery_state = 'delivered'
delivery_trackid = res.ZipKey
status_description = "[]"
error_message = "[]"
#if res.StatusCode == '00':
#delivery_state = 'done'
#else:
#delivery_state = 'failure'
else:
status_description = res.StatusDescription
error_message = res.ErrorMessage
if res.StatusCode == '00':
delivery_state = 'done'
else:
delivery_state = 'failure'
self._force_write(self,
{'fe_delivery_state': delivery_state,
'fe_delivery_trackid': delivery_trackid,
'fe_delivery_status_description' : status_description,
'fe_delivery_error_message' : error_message,
'fe_qrcode' : fe_qrcode,
'fe_cufe' : fe_cufe,
'fe_xml_file' : zip_file
},
invoice)
def fe_process(self):
self.do_fe_delivery()
@classmethod
def fe_delivery(cls):
pool = Pool()
FACHO = pool.get('account_invoice_facho.dian_fe_company')
facho = FACHO(1)
habilitation = facho.dian_fe_habilitation
for inv in cls.search([('type', '=', 'out'),
('state', 'in', ['posted', 'paid']),
('subtype.sequence.invoice_resolution.valid_date_time_to',
'>=', date.today()),
('fe_delivery_state', '!=', 'done')]):
inv.fe_process()
if inv.fe_delivery_state in ['delivered', 'failure'] and habilitation:
inv.fe_update_status()
@fields.depends('credit_note')
def fe_extensions(self, inv):
pool = Pool()
FACHO = pool.get('account_invoice_facho.dian_fe_company')
facho = FACHO(1)
invoice_resolution = self.subtype.sequence.invoice_resolution
if facho.dian_fe_habilitation:
ambiente = fe.AMBIENTE_PRUEBAS
else:
ambiente = fe.AMBIENTE_PRODUCCION
if self.type == 'out':
if self.credit_note:
cufe = fe.DianXMLExtensionCUDE(inv,
facho.dian_fe_pin,
ambiente,)
else:
cufe = fe.DianXMLExtensionCUFE(inv,
invoice_resolution.technical_key,
ambiente,)
elif self.type == "in":
cufe = fe.DianXMLExtensionCUDS(inv,
facho.dian_fe_pin,
ambiente,)
security_code = fe.DianXMLExtensionSoftwareSecurityCode(facho.dian_fe_software_identification,
facho.dian_fe_pin,
inv.invoice_ident)
authorization_provider = fe.DianXMLExtensionAuthorizationProvider()
#cufe = fe.DianXMLExtensionCUFE(inv,
#facho.dian_fe_invoice_resolution.technical_key,
#ambiente,
#)
nit = form.PartyIdentification(
str(facho.dian_fe_technologic_supplier.identifiers[0].code),
str(facho.dian_fe_technologic_supplier.identifiers[0].check_digit),
str(facho.dian_fe_technologic_supplier.identifiers[0].type)
)
software_provider = fe.DianXMLExtensionSoftwareProvider(nit,
nit.dv,
facho.dian_fe_software_identification)
inv_authorization = fe.DianXMLExtensionInvoiceAuthorization(
str(invoice_resolution.resolution_number),
invoice_resolution.valid_date_time_from,
invoice_resolution.valid_date_time_to,
invoice_resolution.prefix,
invoice_resolution.from_number,
invoice_resolution.to_number)
return [security_code, authorization_provider, cufe, software_provider, inv_authorization]
@classmethod
@ModelView.button
def xml(cls, invoices):
for invoice in invoices:
facho_invoice = invoice.tofacho()
xml = cls.type_code_xml(invoice, facho_invoice)
xml_encode = bytes(str(xml),'utf-8')
zip_file = cls._dian_zip_io(cls, 'xml', str(xml)).read()
invoice.fe_xml_file = zip_file
invoice.save()
@classmethod
@ModelView.button
def xml_signed(cls, invoices):
pool = Pool()
FACHO = pool.get('account_invoice_facho.dian_fe_company')
facho = FACHO(1)
for invoice in invoices:
facho_invoice = invoice.tofacho()
extensions = cls.fe_extensions(invoice, facho_invoice)
xml = cls.type_code_xml(invoice, facho_invoice)
for extension in extensions:
xml.add_extension(extension)
xml_document = xml.tostring(xml_declaration=True, encoding='UTF-8').encode('utf-8')
with cls.acquire_public_key() as ctx:
signer = fe.DianXMLExtensionSigner(ctx['file_certs'],
passphrase=ctx['passphrase'],
localpolicy=True)
xml_signed = signer.sign_xml_string(xml_document)
xml_encode = bytes(str(xml_signed),'utf-8')
invoice.fe_xml_file = xml_encode
invoice.save()
@classmethod
@ModelView.button
def fe_send(cls, invoices):
for invoice in invoices:
if invoice.subtype.fe_document:
facho_invoice = invoice.tofacho()
cls.do_fe_delivery(cls, facho_invoice, invoice)
invoice.save()
@classmethod
@ModelView.button
def fe_update_status(self, invoices):
config = Pool().get('account_invoice_facho.configuration')(1)
req = dian.GetStatusZip
for invoice in invoices:
if invoice.fe_delivery_state != 'delivered':
raise UserError(str("La factura debe estar enviada"))
if config.dian_fe_habilitation:
req = dian.Habilitacion.GetStatusZip
resp = self.do_dian_request(self, req(trackId = invoice.fe_delivery_trackid))
params = {}
params['fe_delivery_status_description'] = resp.StatusDescription
params['fe_delivery_error_message'] = resp.ErrorMessage
if resp.IsValid:
params['fe_delivery_state'] = 'done'
else:
params['fe_delivery_state'] = 'failure'
self._force_write(self, params, invoice)
@classmethod
@ModelView.button
def fe_email(cls, invoices, from_=None):
pool = Pool()
Notification_Email = pool.get('notification.email')
Notification_Email_Log = pool.get('notification.email.log')
notification_email, = Notification_Email.search(
['rec_name', '=', 'Enviar Factura por correo'])
trigger = notification_email.triggers[0]
trigger.notification_email.send_email(invoices, trigger)
class Product(metaclass=PoolMeta):
__name__ = 'product.product'
def tofacho(self):
code = "001"
if self.code:
code = self.code
return form.StandardItem(
description = self.name,
id_ = code,
name = 'Estándar de adopción del contribuyente'
)
class InvoiceLine(metaclass=PoolMeta):
__name__ = 'account.invoice.line'
def tofacho(self):
tax_subtotals = []
line_tax = form.TaxTotalOmit()
for taxes, unit_price, quantity, date_time in self.taxable_lines:
#for tax in taxes:
line_percent = taxes[0].rate * 100 if len(taxes) != 0 else 00.00
if len(taxes) != 0:
line_tax = form.TaxTotal(
subtotals = [
form.TaxSubTotal(
percent = abs(line_percent),
scheme=form.TaxScheme('01')
)
]
)
return form.InvoiceLine(
quantity = form.Quantity(abs(self.quantity), '94'),
description = self.description,
item = self.product.tofacho(),
price = form.Price(
amount = form.Amount(abs(self.unit_price)),
type_code = '01',
type = 'IVA'
),
tax = line_tax
)
class InvoiceReportDianZip(Report):
__name__ = 'account.invoice.dian.zip'
@classmethod
def __setup__(cls):
super(InvoiceReportDianZip, cls).__setup__()
cls.__rpc__['execute'] = RPC(False)
@classmethod
def execute(cls, records, data):
pool = Pool()
if len(records) > 1:
raise UserError(str("Imprimir solo uno"))
Invoice = pool.get('account.invoice')
invoice, = Invoice.browse(records)
invoice.invoice_report_cache = None
invoice.save()
Report = pool.get('account.invoice', type='report')
ext, content, _, name = Report.execute([invoice.id], {})
if not invoice.fe_cufe:
raise UserError(str('Factura no enviada a la Dian'))
zip_file = io.BytesIO(invoice.fe_xml_file)
name = invoice._dian_xml_file_name(invoice.fe_cufe + '.xml')
filename = name + '.' + ext
with zipfile.ZipFile(zip_file, 'a') as pdf_add:
pdf_add.writestr(filename, content)
zip_file.seek(0)
return ('zip', zip_file.read(), False, name)
class InvoiceSubtype(metaclass=PoolMeta):
'Invoice Subtype'
__name__ = 'account.invoice.subtype'
fe_document = fields.Boolean('Fe Document')