509 lines
17 KiB
Python
509 lines
17 KiB
Python
# This file is part of csv_import module for Tryton.
|
|
# The COPYRIGHT file at the top level of this repository contains
|
|
# the full copyright notices and license terms.
|
|
import os
|
|
import re
|
|
import unicodedata
|
|
import string
|
|
import csv
|
|
from io import StringIO
|
|
from datetime import datetime
|
|
from trytond.config import config
|
|
from trytond.model import ModelSQL, ModelView, fields, Workflow
|
|
from trytond.pool import Pool, PoolMeta
|
|
from trytond.pyson import Eval
|
|
from trytond.transaction import Transaction
|
|
from trytond.i18n import gettext
|
|
from trytond.exceptions import UserError
|
|
|
|
|
|
__all__ = ['BaseExternalMapping',
|
|
'CSVProfile', 'CSVProfileBaseExternalMapping', 'CSVArchive']
|
|
|
|
|
|
def slugify(value):
|
|
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
|
|
value = re.sub('[^\w\s-]', '', value.decode('utf-8')).strip().lower()
|
|
return re.sub('[-\s]+', '-', value)
|
|
|
|
|
|
class BaseExternalMapping(metaclass=PoolMeta):
|
|
__name__ = 'base.external.mapping'
|
|
csv_mapping = fields.Many2One('base.external.mapping', 'CSV Mapping')
|
|
csv_rel_field = fields.Many2One('ir.model.field', 'CSV Field related')
|
|
|
|
|
|
class CSVProfile(ModelSQL, ModelView):
|
|
'CSV Profile'
|
|
__name__ = 'csv.profile'
|
|
name = fields.Char('Name', required=True)
|
|
archives = fields.One2Many('csv.archive', 'profile',
|
|
'Archives')
|
|
model = fields.Many2One('ir.model', 'Model', required=True)
|
|
mappings = fields.Many2Many('csv.profile-base.external.mapping',
|
|
'profile', 'mapping', 'Mappings', required=True)
|
|
code_internal = fields.Many2One('ir.model.field', 'Tryton Code Field',
|
|
domain=[('model', '=', Eval('model'))],
|
|
states={
|
|
'invisible': ~Eval('update_record', True),
|
|
'required': Eval('update_record', True),
|
|
}, depends=['model', 'update_record'],
|
|
help='Code field in Tryton.')
|
|
code_external = fields.Integer("CSV Code Field",
|
|
states={
|
|
'invisible': ~Eval('update_record', True),
|
|
'required': Eval('update_record', True),
|
|
}, depends=['model', 'update_record'],
|
|
help='Code field in CSV column.')
|
|
create_record = fields.Boolean('Create', help='Create record from CSV')
|
|
update_record = fields.Boolean('Update', help='Update record from CSV')
|
|
testing = fields.Boolean('Testing', help='Not create or update records')
|
|
active = fields.Boolean('Active')
|
|
csv_header = fields.Boolean('Header',
|
|
help='Header (field names) on archives')
|
|
csv_archive_separator = fields.Selection([
|
|
(',', 'Comma'),
|
|
(';', 'Semicolon'),
|
|
('tab', 'Tabulator'),
|
|
('|', '|'),
|
|
], 'CSV Separator', help="Archive CSV Separator",
|
|
required=True)
|
|
csv_quote = fields.Char('Quote', required=True,
|
|
help='Character to use as quote')
|
|
note = fields.Text('Notes')
|
|
|
|
@staticmethod
|
|
def default_active():
|
|
return True
|
|
|
|
@staticmethod
|
|
def default_create_record():
|
|
return True
|
|
|
|
@staticmethod
|
|
def default_update_record():
|
|
return False
|
|
|
|
@staticmethod
|
|
def default_csv_header():
|
|
return True
|
|
|
|
@staticmethod
|
|
def default_csv_archive_separator():
|
|
return ","
|
|
|
|
@staticmethod
|
|
def default_csv_quote():
|
|
return '"'
|
|
|
|
@staticmethod
|
|
def default_code_external():
|
|
return 0
|
|
|
|
|
|
class CSVProfileBaseExternalMapping(ModelSQL):
|
|
'CSV Profile - Base External Mapping'
|
|
__name__ = 'csv.profile-base.external.mapping'
|
|
_table = 'csv_profile_mapping_rel'
|
|
profile = fields.Many2One('csv.profile', 'Profile',
|
|
ondelete='CASCADE', select=True, required=True)
|
|
mapping = fields.Many2One('base.external.mapping', 'Mapping',
|
|
ondelete='RESTRICT', required=True)
|
|
|
|
|
|
class CSVArchive(Workflow, ModelSQL, ModelView):
|
|
'CSV Archive'
|
|
__name__ = 'csv.archive'
|
|
_rec_name = 'archive_name'
|
|
profile = fields.Many2One('csv.profile', 'CSV Profile', ondelete='CASCADE',
|
|
required=True)
|
|
date_archive = fields.DateTime('Date', required=True)
|
|
data = fields.Function(fields.Binary('Archive', filename='archive_name',
|
|
required=True), 'get_data', setter='set_data')
|
|
archive_name = fields.Char('Archive Name')
|
|
logs = fields.Text("Logs", readonly=True)
|
|
state = fields.Selection([
|
|
('draft', 'Draft'),
|
|
('done', 'Done'),
|
|
('canceled', 'Canceled'),
|
|
], 'State', required=True, readonly=True)
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(CSVArchive, cls).__setup__()
|
|
cls._order = [
|
|
('date_archive', 'DESC'),
|
|
('id', 'DESC'),
|
|
]
|
|
cls._transitions |= set((
|
|
('draft', 'done'),
|
|
('draft', 'canceled'),
|
|
('canceled', 'draft'),
|
|
))
|
|
cls._buttons.update({
|
|
'cancel': {
|
|
'invisible': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
'draft': {
|
|
'invisible': Eval('state') != 'canceled',
|
|
'depends': ['state'],
|
|
},
|
|
'import_csv': {
|
|
'invisible': Eval('state') != 'draft',
|
|
'depends': ['state'],
|
|
},
|
|
})
|
|
|
|
def get_data(self, name):
|
|
path = os.path.join(config.get('database', 'path'),
|
|
Transaction().database.name, 'csv_import')
|
|
archive = '%s/%s' % (path, self.archive_name.replace(' ', '_'))
|
|
try:
|
|
with open(archive, 'rb') as f:
|
|
return fields.Binary.cast(f.read())
|
|
except IOError:
|
|
pass
|
|
|
|
@classmethod
|
|
def set_data(cls, archives, name, value):
|
|
path = os.path.join(config.get('database', 'path'),
|
|
Transaction().database.name, 'csv_import')
|
|
if not os.path.exists(path):
|
|
os.makedirs(path, mode=0o777)
|
|
for archive in archives:
|
|
archive = '%s/%s' % (path, archive.archive_name.replace(' ', '_'))
|
|
try:
|
|
with open(archive, 'wb') as f:
|
|
f.write(value)
|
|
except IOError:
|
|
raise UserError(gettext('csv_import.msg_error'))
|
|
|
|
@fields.depends('profile', '_parent_profile.rec_name')
|
|
def on_change_profile(self):
|
|
if self.profile:
|
|
today = Pool().get('ir.date').today()
|
|
files = len(self.search([
|
|
('archive_name', 'like', '%s_%s_%s.csv' %
|
|
(today, '%', slugify(self.profile.rec_name))),
|
|
]))
|
|
self.archive_name = '%s_%s_%s.csv' % \
|
|
(today, files, slugify(self.profile.rec_name))
|
|
else:
|
|
self.archive_name = None
|
|
|
|
@staticmethod
|
|
def default_date_archive():
|
|
return datetime.now()
|
|
|
|
@staticmethod
|
|
def default_state():
|
|
return 'draft'
|
|
|
|
@staticmethod
|
|
def default_profile():
|
|
CSVProfile = Pool().get('csv.profile')
|
|
csv_profiles = CSVProfile.search([])
|
|
if len(csv_profiles) == 1:
|
|
return csv_profiles[0].id
|
|
|
|
@classmethod
|
|
def _import_data_sale(cls, record, values, parent_values=None):
|
|
'''
|
|
Sale and Sale Line data
|
|
'''
|
|
pool = Pool()
|
|
Sale = pool.get('sale.sale')
|
|
SaleLine = pool.get('sale.line')
|
|
Party = pool.get('party.party')
|
|
|
|
record_name = record.__name__
|
|
|
|
if record_name == 'sale.sale':
|
|
party = values.get('party')
|
|
|
|
if party:
|
|
party = Party(party)
|
|
|
|
if not record.id:
|
|
record = Sale.get_sale_data(values.get('party'))
|
|
|
|
if hasattr(record, 'shop') and not getattr(record, 'shop'):
|
|
shop, = pool.get('sale.shop').search([], limit=1)
|
|
record.shop = shop
|
|
|
|
if values.get('invoice_address') \
|
|
and values.get('invoice_address') in party.addresses:
|
|
record.invoice_address = values.get('invoice_address')
|
|
|
|
if values.get('shipment_address') \
|
|
and values.get('shipment_address') in party.addresses:
|
|
record.shipment_address = values.get('shipment_address')
|
|
|
|
if values.get('customer_reference'):
|
|
record.customer_reference = values.get('customer_reference')
|
|
|
|
if values.get('lines'):
|
|
record.lines = values.get('lines')
|
|
|
|
return record
|
|
|
|
if record_name == 'sale.line':
|
|
if values.get('product') and values.get('quantity'):
|
|
sale = Sale.get_sale_data(parent_values.get('party'))
|
|
line = SaleLine.get_sale_line_data(
|
|
sale,
|
|
values.get('product'),
|
|
values.get('quantity')
|
|
)
|
|
line.on_change_product()
|
|
|
|
return line
|
|
|
|
return record
|
|
|
|
@classmethod
|
|
def _import_data_purchase(cls, record, values, parent_values=None):
|
|
'''
|
|
Purchase and Purchase Line data
|
|
'''
|
|
pool = Pool()
|
|
Purchase = pool.get('purchase.purchase')
|
|
Party = pool.get('party.party')
|
|
|
|
record_name = record.__name__
|
|
|
|
if record_name == 'purchase.purchase':
|
|
party = values.get('party')
|
|
|
|
if party:
|
|
party = Party(party)
|
|
|
|
if not record.id:
|
|
default_values = record.default_get(record._fields.keys(),
|
|
with_rec_name=False)
|
|
for key, value in default_values.items():
|
|
setattr(record, key, value)
|
|
record.party = party
|
|
record.on_change_party()
|
|
|
|
if values.get('invoice_address') \
|
|
and values.get('invoice_address') in party.addresses:
|
|
record.invoice_address = values.get('invoice_address')
|
|
|
|
if values.get('lines'):
|
|
record.lines = values.get('lines')
|
|
|
|
return record
|
|
|
|
if record_name == 'purchase.line':
|
|
if values.get('product') and values.get('quantity'):
|
|
purchase = Purchase()
|
|
default_values = Purchase.default_get(Purchase._fields.keys(),
|
|
with_rec_name=False)
|
|
for key, value in default_values.items():
|
|
setattr(purchase, key, value)
|
|
purchase.party = parent_values.get('party')
|
|
purchase.on_change_party()
|
|
|
|
record.purchase = purchase
|
|
record.product = values.get('product')
|
|
record.quantity = values.get('quantity')
|
|
record.on_change_product()
|
|
|
|
return record
|
|
|
|
return record
|
|
|
|
@classmethod
|
|
def _import_data(cls, record, values, parent_values=None):
|
|
'''Load _import_data_modelname or seattr from dict'''
|
|
method_data = '_import_data_%s' % record.__name__.split('.')[0]
|
|
|
|
if hasattr(cls, method_data):
|
|
import_data = getattr(cls, method_data)
|
|
record = import_data(record, values, parent_values)
|
|
for k, v in values.items():
|
|
setattr(record, k, v)
|
|
return record
|
|
|
|
@classmethod
|
|
def post_import(cls, profile, records):
|
|
""" This method is made to be overridden and execute something with
|
|
imported records after import them. At the end of the inherited
|
|
@param profile: profile object
|
|
@param records: List of id records.
|
|
"""
|
|
pass
|
|
|
|
@classmethod
|
|
def _read_csv_file(cls, archive):
|
|
'''Read CSV data from archive'''
|
|
headers = None
|
|
profile = archive.profile
|
|
|
|
separator = profile.csv_archive_separator
|
|
if separator == "tab":
|
|
separator = '\t'
|
|
quote = profile.csv_quote
|
|
header = profile.csv_header
|
|
|
|
data = StringIO(archive.data.decode('ascii', errors='replace'))
|
|
try:
|
|
reader = csv.reader(data, delimiter=str(separator),
|
|
quotechar=str(quote))
|
|
except TypeError:
|
|
cls.write([archive], {'logs': 'Error - %s' % (
|
|
gettext('csv_import.msg_read_error',
|
|
filename=archive.archive_name.replace(' ', '_'))
|
|
)})
|
|
return
|
|
|
|
if header:
|
|
# TODO. Know why some header columns get ""
|
|
headers = ["".join(list(filter(lambda x: x in string.printable,
|
|
x.replace('"', '')))) for x in next(reader)]
|
|
return reader, headers
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('done')
|
|
def import_csv(cls, archives):
|
|
'''
|
|
Process archives to import data from CSV files
|
|
base: base model, e.g: party
|
|
childs: new lines related a base, e.g: addresses
|
|
'''
|
|
pool = Pool()
|
|
ExternalMapping = pool.get('base.external.mapping')
|
|
|
|
logs = []
|
|
for archive in archives:
|
|
profile = archive.profile
|
|
|
|
if not profile.create_record and not profile.update_record or not archive.data:
|
|
continue
|
|
|
|
reader, headers = cls._read_csv_file(archive)
|
|
|
|
base_model = profile.model.model
|
|
|
|
child_mappings = []
|
|
for mapping in profile.mappings:
|
|
if not mapping.model.model == base_model:
|
|
child_mappings.append(mapping)
|
|
else:
|
|
base_mapping = mapping
|
|
if not base_mapping:
|
|
logs.append(gettext('csv_import.msg_not_mapping',
|
|
profile=profile.rec_name))
|
|
continue
|
|
|
|
new_records = []
|
|
new_lines = []
|
|
rows = list(reader)
|
|
Base = pool.get(base_model)
|
|
for i in range(len(rows)):
|
|
row = rows[i]
|
|
if not row:
|
|
continue
|
|
|
|
#join header and row to convert a list to dict {header: value}
|
|
vals = dict(zip(headers, row))
|
|
|
|
#get values base model
|
|
if not new_lines:
|
|
base_values = ExternalMapping.map_external_to_tryton(
|
|
base_mapping.name, vals)
|
|
if not list(base_values.values()):
|
|
continue
|
|
if not list(base_values.values())[0] == '':
|
|
new_lines = []
|
|
|
|
#get values child models
|
|
child_values = None
|
|
child_rel_field = None
|
|
for child in child_mappings:
|
|
if not child.csv_rel_field:
|
|
logs.append(gettext('csv_import.msg_missing_rel_field',
|
|
mapping=child.rec_name))
|
|
continue
|
|
child_rel_field = child.csv_rel_field.name
|
|
child_values = ExternalMapping.map_external_to_tryton(
|
|
child.name, vals)
|
|
Child = pool.get(child.model.model)
|
|
child = Child()
|
|
# get default values in child model
|
|
child_values = cls._import_data(child, child_values,
|
|
base_values)
|
|
new_lines.append(child_values)
|
|
|
|
if child_rel_field:
|
|
base_values[child_rel_field] = new_lines
|
|
|
|
# next row is empty first value, is a new line. Continue
|
|
if i < len(rows) - 1:
|
|
if rows[i + 1]:
|
|
if rows[i + 1][0] == '':
|
|
continue
|
|
else:
|
|
new_lines = []
|
|
|
|
#create object or get object exist
|
|
record = None
|
|
records = None
|
|
if profile.update_record:
|
|
val = row[profile.code_external]
|
|
records = Base.search([
|
|
(profile.code_internal.name, '=', val)
|
|
])
|
|
if records:
|
|
record = Base(records[0])
|
|
if profile.create_record and not records:
|
|
record = Base()
|
|
|
|
if not record:
|
|
logs.append(gettext('csv_import.msg_not_create_update',
|
|
line=i + 1))
|
|
continue
|
|
|
|
#get default values from base model
|
|
record = cls._import_data(record, base_values)
|
|
|
|
#save - not testing
|
|
if not profile.testing:
|
|
try:
|
|
record.save() # save or update
|
|
except (UserError, ValueError) as e:
|
|
raise UserError(e.__str__())
|
|
|
|
logs.append(gettext('csv_import.msg_record_saved',
|
|
record=record.id))
|
|
new_records.append(record.id)
|
|
|
|
if profile.testing:
|
|
logs.append(gettext('csv_import.msg_success_simulation'))
|
|
|
|
cls.post_import(profile, new_records)
|
|
cls.write([archive], {'logs': '\n'.join(logs)})
|
|
|
|
@classmethod
|
|
def copy(cls, archives, default=None):
|
|
if default is None:
|
|
default = {}
|
|
default = default.copy()
|
|
default['logs'] = None
|
|
return super(CSVArchive, cls).copy(archives, default=default)
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('draft')
|
|
def draft(cls, archives):
|
|
pass
|
|
|
|
@classmethod
|
|
@ModelView.button
|
|
@Workflow.transition('canceled')
|
|
def cancel(cls, archives):
|
|
pass
|