trytond-csv_import/csv_import.py

509 lines
17 KiB
Python

# This file is part of csv_import module for Tryton.
# The COPYRIGHT file at the top level of this repository contains
# the full copyright notices and license terms.
import os
import re
import unicodedata
import string
import csv
from io import StringIO
from datetime import datetime
from trytond.config import config
from trytond.model import ModelSQL, ModelView, fields, Workflow
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Eval
from trytond.transaction import Transaction
from trytond.i18n import gettext
from trytond.exceptions import UserError
__all__ = ['BaseExternalMapping',
'CSVProfile', 'CSVProfileBaseExternalMapping', 'CSVArchive']
def slugify(value):
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
value = re.sub('[^\w\s-]', '', value.decode('utf-8')).strip().lower()
return re.sub('[-\s]+', '-', value)
class BaseExternalMapping(metaclass=PoolMeta):
__name__ = 'base.external.mapping'
csv_mapping = fields.Many2One('base.external.mapping', 'CSV Mapping')
csv_rel_field = fields.Many2One('ir.model.field', 'CSV Field related')
class CSVProfile(ModelSQL, ModelView):
'CSV Profile'
__name__ = 'csv.profile'
name = fields.Char('Name', required=True)
archives = fields.One2Many('csv.archive', 'profile',
'Archives')
model = fields.Many2One('ir.model', 'Model', required=True)
mappings = fields.Many2Many('csv.profile-base.external.mapping',
'profile', 'mapping', 'Mappings', required=True)
code_internal = fields.Many2One('ir.model.field', 'Tryton Code Field',
domain=[('model', '=', Eval('model'))],
states={
'invisible': ~Eval('update_record', True),
'required': Eval('update_record', True),
}, depends=['model', 'update_record'],
help='Code field in Tryton.')
code_external = fields.Integer("CSV Code Field",
states={
'invisible': ~Eval('update_record', True),
'required': Eval('update_record', True),
}, depends=['model', 'update_record'],
help='Code field in CSV column.')
create_record = fields.Boolean('Create', help='Create record from CSV')
update_record = fields.Boolean('Update', help='Update record from CSV')
testing = fields.Boolean('Testing', help='Not create or update records')
active = fields.Boolean('Active')
csv_header = fields.Boolean('Header',
help='Header (field names) on archives')
csv_archive_separator = fields.Selection([
(',', 'Comma'),
(';', 'Semicolon'),
('tab', 'Tabulator'),
('|', '|'),
], 'CSV Separator', help="Archive CSV Separator",
required=True)
csv_quote = fields.Char('Quote', required=True,
help='Character to use as quote')
note = fields.Text('Notes')
@staticmethod
def default_active():
return True
@staticmethod
def default_create_record():
return True
@staticmethod
def default_update_record():
return False
@staticmethod
def default_csv_header():
return True
@staticmethod
def default_csv_archive_separator():
return ","
@staticmethod
def default_csv_quote():
return '"'
@staticmethod
def default_code_external():
return 0
class CSVProfileBaseExternalMapping(ModelSQL):
'CSV Profile - Base External Mapping'
__name__ = 'csv.profile-base.external.mapping'
_table = 'csv_profile_mapping_rel'
profile = fields.Many2One('csv.profile', 'Profile',
ondelete='CASCADE', select=True, required=True)
mapping = fields.Many2One('base.external.mapping', 'Mapping',
ondelete='RESTRICT', required=True)
class CSVArchive(Workflow, ModelSQL, ModelView):
'CSV Archive'
__name__ = 'csv.archive'
_rec_name = 'archive_name'
profile = fields.Many2One('csv.profile', 'CSV Profile', ondelete='CASCADE',
required=True)
date_archive = fields.DateTime('Date', required=True)
data = fields.Function(fields.Binary('Archive', filename='archive_name',
required=True), 'get_data', setter='set_data')
archive_name = fields.Char('Archive Name')
logs = fields.Text("Logs", readonly=True)
state = fields.Selection([
('draft', 'Draft'),
('done', 'Done'),
('canceled', 'Canceled'),
], 'State', required=True, readonly=True)
@classmethod
def __setup__(cls):
super(CSVArchive, cls).__setup__()
cls._order = [
('date_archive', 'DESC'),
('id', 'DESC'),
]
cls._transitions |= set((
('draft', 'done'),
('draft', 'canceled'),
('canceled', 'draft'),
))
cls._buttons.update({
'cancel': {
'invisible': Eval('state') != 'draft',
'depends': ['state'],
},
'draft': {
'invisible': Eval('state') != 'canceled',
'depends': ['state'],
},
'import_csv': {
'invisible': Eval('state') != 'draft',
'depends': ['state'],
},
})
def get_data(self, name):
path = os.path.join(config.get('database', 'path'),
Transaction().database.name, 'csv_import')
archive = '%s/%s' % (path, self.archive_name.replace(' ', '_'))
try:
with open(archive, 'rb') as f:
return fields.Binary.cast(f.read())
except IOError:
pass
@classmethod
def set_data(cls, archives, name, value):
path = os.path.join(config.get('database', 'path'),
Transaction().database.name, 'csv_import')
if not os.path.exists(path):
os.makedirs(path, mode=0o777)
for archive in archives:
archive = '%s/%s' % (path, archive.archive_name.replace(' ', '_'))
try:
with open(archive, 'wb') as f:
f.write(value)
except IOError:
raise UserError(gettext('csv_import.msg_error'))
@fields.depends('profile', '_parent_profile.rec_name')
def on_change_profile(self):
if self.profile:
today = Pool().get('ir.date').today()
files = len(self.search([
('archive_name', 'like', '%s_%s_%s.csv' %
(today, '%', slugify(self.profile.rec_name))),
]))
self.archive_name = '%s_%s_%s.csv' % \
(today, files, slugify(self.profile.rec_name))
else:
self.archive_name = None
@staticmethod
def default_date_archive():
return datetime.now()
@staticmethod
def default_state():
return 'draft'
@staticmethod
def default_profile():
CSVProfile = Pool().get('csv.profile')
csv_profiles = CSVProfile.search([])
if len(csv_profiles) == 1:
return csv_profiles[0].id
@classmethod
def _import_data_sale(cls, record, values, parent_values=None):
'''
Sale and Sale Line data
'''
pool = Pool()
Sale = pool.get('sale.sale')
SaleLine = pool.get('sale.line')
Party = pool.get('party.party')
record_name = record.__name__
if record_name == 'sale.sale':
party = values.get('party')
if party:
party = Party(party)
if not record.id:
record = Sale.get_sale_data(values.get('party'))
if hasattr(record, 'shop') and not getattr(record, 'shop'):
shop, = pool.get('sale.shop').search([], limit=1)
record.shop = shop
if values.get('invoice_address') \
and values.get('invoice_address') in party.addresses:
record.invoice_address = values.get('invoice_address')
if values.get('shipment_address') \
and values.get('shipment_address') in party.addresses:
record.shipment_address = values.get('shipment_address')
if values.get('customer_reference'):
record.customer_reference = values.get('customer_reference')
if values.get('lines'):
record.lines = values.get('lines')
return record
if record_name == 'sale.line':
if values.get('product') and values.get('quantity'):
sale = Sale.get_sale_data(parent_values.get('party'))
line = SaleLine.get_sale_line_data(
sale,
values.get('product'),
values.get('quantity')
)
line.on_change_product()
return line
return record
@classmethod
def _import_data_purchase(cls, record, values, parent_values=None):
'''
Purchase and Purchase Line data
'''
pool = Pool()
Purchase = pool.get('purchase.purchase')
Party = pool.get('party.party')
record_name = record.__name__
if record_name == 'purchase.purchase':
party = values.get('party')
if party:
party = Party(party)
if not record.id:
default_values = record.default_get(record._fields.keys(),
with_rec_name=False)
for key, value in default_values.items():
setattr(record, key, value)
record.party = party
record.on_change_party()
if values.get('invoice_address') \
and values.get('invoice_address') in party.addresses:
record.invoice_address = values.get('invoice_address')
if values.get('lines'):
record.lines = values.get('lines')
return record
if record_name == 'purchase.line':
if values.get('product') and values.get('quantity'):
purchase = Purchase()
default_values = Purchase.default_get(Purchase._fields.keys(),
with_rec_name=False)
for key, value in default_values.items():
setattr(purchase, key, value)
purchase.party = parent_values.get('party')
purchase.on_change_party()
record.purchase = purchase
record.product = values.get('product')
record.quantity = values.get('quantity')
record.on_change_product()
return record
return record
@classmethod
def _import_data(cls, record, values, parent_values=None):
'''Load _import_data_modelname or seattr from dict'''
method_data = '_import_data_%s' % record.__name__.split('.')[0]
if hasattr(cls, method_data):
import_data = getattr(cls, method_data)
record = import_data(record, values, parent_values)
for k, v in values.items():
setattr(record, k, v)
return record
@classmethod
def post_import(cls, profile, records):
""" This method is made to be overridden and execute something with
imported records after import them. At the end of the inherited
@param profile: profile object
@param records: List of id records.
"""
pass
@classmethod
def _read_csv_file(cls, archive):
'''Read CSV data from archive'''
headers = None
profile = archive.profile
separator = profile.csv_archive_separator
if separator == "tab":
separator = '\t'
quote = profile.csv_quote
header = profile.csv_header
data = StringIO(archive.data.decode('ascii', errors='replace'))
try:
reader = csv.reader(data, delimiter=str(separator),
quotechar=str(quote))
except TypeError:
cls.write([archive], {'logs': 'Error - %s' % (
gettext('csv_import.msg_read_error',
filename=archive.archive_name.replace(' ', '_'))
)})
return
if header:
# TODO. Know why some header columns get ""
headers = ["".join(list(filter(lambda x: x in string.printable,
x.replace('"', '')))) for x in next(reader)]
return reader, headers
@classmethod
@ModelView.button
@Workflow.transition('done')
def import_csv(cls, archives):
'''
Process archives to import data from CSV files
base: base model, e.g: party
childs: new lines related a base, e.g: addresses
'''
pool = Pool()
ExternalMapping = pool.get('base.external.mapping')
logs = []
for archive in archives:
profile = archive.profile
if not profile.create_record and not profile.update_record or not archive.data:
continue
reader, headers = cls._read_csv_file(archive)
base_model = profile.model.model
child_mappings = []
for mapping in profile.mappings:
if not mapping.model.model == base_model:
child_mappings.append(mapping)
else:
base_mapping = mapping
if not base_mapping:
logs.append(gettext('csv_import.msg_not_mapping',
profile=profile.rec_name))
continue
new_records = []
new_lines = []
rows = list(reader)
Base = pool.get(base_model)
for i in range(len(rows)):
row = rows[i]
if not row:
continue
#join header and row to convert a list to dict {header: value}
vals = dict(zip(headers, row))
#get values base model
if not new_lines:
base_values = ExternalMapping.map_external_to_tryton(
base_mapping.name, vals)
if not list(base_values.values()):
continue
if not list(base_values.values())[0] == '':
new_lines = []
#get values child models
child_values = None
child_rel_field = None
for child in child_mappings:
if not child.csv_rel_field:
logs.append(gettext('csv_import.msg_missing_rel_field',
mapping=child.rec_name))
continue
child_rel_field = child.csv_rel_field.name
child_values = ExternalMapping.map_external_to_tryton(
child.name, vals)
Child = pool.get(child.model.model)
child = Child()
# get default values in child model
child_values = cls._import_data(child, child_values,
base_values)
new_lines.append(child_values)
if child_rel_field:
base_values[child_rel_field] = new_lines
# next row is empty first value, is a new line. Continue
if i < len(rows) - 1:
if rows[i + 1]:
if rows[i + 1][0] == '':
continue
else:
new_lines = []
#create object or get object exist
record = None
records = None
if profile.update_record:
val = row[profile.code_external]
records = Base.search([
(profile.code_internal.name, '=', val)
])
if records:
record = Base(records[0])
if profile.create_record and not records:
record = Base()
if not record:
logs.append(gettext('csv_import.msg_not_create_update',
line=i + 1))
continue
#get default values from base model
record = cls._import_data(record, base_values)
#save - not testing
if not profile.testing:
try:
record.save() # save or update
except (UserError, ValueError) as e:
raise UserError(e.__str__())
logs.append(gettext('csv_import.msg_record_saved',
record=record.id))
new_records.append(record.id)
if profile.testing:
logs.append(gettext('csv_import.msg_success_simulation'))
cls.post_import(profile, new_records)
cls.write([archive], {'logs': '\n'.join(logs)})
@classmethod
def copy(cls, archives, default=None):
if default is None:
default = {}
default = default.copy()
default['logs'] = None
return super(CSVArchive, cls).copy(archives, default=default)
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, archives):
pass
@classmethod
@ModelView.button
@Workflow.transition('canceled')
def cancel(cls, archives):
pass