mirror of
https://github.com/NaN-tic/trytond-party_merge.git
synced 2023-12-14 03:13:02 +01:00
136 lines
4 KiB
Python
136 lines
4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
party.py
|
|
|
|
:copyright: (c) 2014 by Openlabs Technologies & Consulting (P) Limited
|
|
:copyright: (C) 2016 by NaN·tic Projectes de programari lliure
|
|
:license: BSD, see LICENSE for more details.
|
|
"""
|
|
from sql import Table
|
|
from trytond.model import ModelSQL, ModelView, fields
|
|
from trytond.transaction import Transaction
|
|
from trytond.pool import PoolMeta, Pool
|
|
from trytond.pyson import Eval, Bool, Not
|
|
from trytond.rpc import RPC
|
|
from trytond.wizard import Wizard, StateView, StateTransition, Button
|
|
|
|
|
|
__all__ = ['Party', 'PartyMergeView', 'PartyMerge']
|
|
|
|
|
|
class MergeMixin(object):
|
|
def merge_into(self, target):
|
|
"""
|
|
Merge current record into target
|
|
"""
|
|
ModelField = Pool().get('ir.model.field')
|
|
|
|
model_fields = ModelField.search([
|
|
('relation', '=', self.__name__),
|
|
('ttype', '=', 'many2one'),
|
|
])
|
|
|
|
if hasattr(self, 'active'):
|
|
self.active = False
|
|
self.merged_into = target
|
|
self.save()
|
|
|
|
cursor = Transaction().connection.cursor()
|
|
|
|
to_validate = []
|
|
for field in model_fields:
|
|
Model = Pool().get(field.model.model)
|
|
|
|
if isinstance(getattr(Model, field.name), fields.Function):
|
|
continue
|
|
|
|
if not hasattr(Model, '__table__'):
|
|
continue
|
|
|
|
sql_table = Model.__table__()
|
|
# Discard sql.Union or others generated by table_query()
|
|
if not isinstance(sql_table, Table):
|
|
continue
|
|
|
|
to_validate.append(field)
|
|
sql_field = getattr(sql_table, field.name)
|
|
cursor.execute(*sql_table.update(
|
|
columns=[sql_field],
|
|
values=[target.id],
|
|
where=(sql_field == self.id)
|
|
))
|
|
|
|
# Validate all related records and target.
|
|
# Do it at the very end because we may # temporarily leave
|
|
# information inconsistent in the previous loop
|
|
for field in model_fields:
|
|
Model = Pool().get(field.model.model)
|
|
|
|
if not isinstance(Model, ModelSQL):
|
|
continue
|
|
|
|
ff = getattr(Model, field.name)
|
|
if isinstance(ff, fields.Function) and not ff.searcher:
|
|
continue
|
|
|
|
with Transaction().set_context(active_test=False):
|
|
Model.validate(Model.search([
|
|
(field.name, '=', target.id),
|
|
]))
|
|
|
|
self.validate([target])
|
|
|
|
|
|
class Party(MergeMixin):
|
|
__metaclass__ = PoolMeta
|
|
__name__ = 'party.party'
|
|
merged_into = fields.Many2One('party.party', 'Merged Into', readonly=True,
|
|
states={
|
|
'invisible': Not(Bool(Eval('merged_into'))),
|
|
})
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(Party, cls).__setup__()
|
|
cls.__rpc__.update({
|
|
'merge_parties': RPC(readonly=False, instantiate=slice(0, 2)),
|
|
})
|
|
|
|
@classmethod
|
|
def merge_parties(cls, party, target):
|
|
party.merge_into(target)
|
|
|
|
|
|
class PartyMergeView(ModelView):
|
|
'Party Merge'
|
|
__name__ = 'party.party.merge.view'
|
|
|
|
duplicates = fields.One2Many('party.party', None, 'Duplicates',
|
|
readonly=True)
|
|
target = fields.Many2One('party.party', 'Target', required=True,
|
|
domain=[('id', 'not in', Eval('duplicates'))], depends=['duplicates'])
|
|
|
|
|
|
class PartyMerge(Wizard):
|
|
__name__ = 'party.party.merge'
|
|
start_state = 'merge'
|
|
|
|
merge = StateView(
|
|
'party.party.merge.view',
|
|
'party_merge.party_merge_view', [
|
|
Button('Cancel', 'end', 'tryton-cancel'),
|
|
Button('OK', 'result', 'tryton-ok', default=True),
|
|
]
|
|
)
|
|
result = StateTransition()
|
|
|
|
def default_merge(self, fields):
|
|
return {
|
|
'duplicates': Transaction().context['active_ids'],
|
|
}
|
|
|
|
def transition_result(self):
|
|
for party in self.merge.duplicates:
|
|
party.merge_into(self.merge.target)
|
|
return 'end'
|