trytond-party_merge/party.py

136 lines
4.0 KiB
Python

# -*- coding: utf-8 -*-
"""
party.py
:copyright: (c) 2014 by Openlabs Technologies & Consulting (P) Limited
:copyright: (C) 2016 by NaN·tic Projectes de programari lliure
:license: BSD, see LICENSE for more details.
"""
from sql import Table
from trytond.model import ModelSQL, ModelView, fields
from trytond.transaction import Transaction
from trytond.pool import PoolMeta, Pool
from trytond.pyson import Eval, Bool, Not
from trytond.rpc import RPC
from trytond.wizard import Wizard, StateView, StateTransition, Button
__all__ = ['Party', 'PartyMergeView', 'PartyMerge']
class MergeMixin(object):
def merge_into(self, target):
"""
Merge current record into target
"""
ModelField = Pool().get('ir.model.field')
model_fields = ModelField.search([
('relation', '=', self.__name__),
('ttype', '=', 'many2one'),
])
if hasattr(self, 'active'):
self.active = False
self.merged_into = target
self.save()
cursor = Transaction().connection.cursor()
to_validate = []
for field in model_fields:
Model = Pool().get(field.model.model)
if isinstance(getattr(Model, field.name), fields.Function):
continue
if not hasattr(Model, '__table__'):
continue
sql_table = Model.__table__()
# Discard sql.Union or others generated by table_query()
if not isinstance(sql_table, Table):
continue
to_validate.append(field)
sql_field = getattr(sql_table, field.name)
cursor.execute(*sql_table.update(
columns=[sql_field],
values=[target.id],
where=(sql_field == self.id)
))
# Validate all related records and target.
# Do it at the very end because we may # temporarily leave
# information inconsistent in the previous loop
for field in model_fields:
Model = Pool().get(field.model.model)
if not isinstance(Model, ModelSQL):
continue
ff = getattr(Model, field.name)
if isinstance(ff, fields.Function) and not ff.searcher:
continue
with Transaction().set_context(active_test=False):
Model.validate(Model.search([
(field.name, '=', target.id),
]))
self.validate([target])
class Party(MergeMixin):
__metaclass__ = PoolMeta
__name__ = 'party.party'
merged_into = fields.Many2One('party.party', 'Merged Into', readonly=True,
states={
'invisible': Not(Bool(Eval('merged_into'))),
})
@classmethod
def __setup__(cls):
super(Party, cls).__setup__()
cls.__rpc__.update({
'merge_parties': RPC(readonly=False, instantiate=slice(0, 2)),
})
@classmethod
def merge_parties(cls, party, target):
party.merge_into(target)
class PartyMergeView(ModelView):
'Party Merge'
__name__ = 'party.party.merge.view'
duplicates = fields.One2Many('party.party', None, 'Duplicates',
readonly=True)
target = fields.Many2One('party.party', 'Target', required=True,
domain=[('id', 'not in', Eval('duplicates'))], depends=['duplicates'])
class PartyMerge(Wizard):
__name__ = 'party.party.merge'
start_state = 'merge'
merge = StateView(
'party.party.merge.view',
'party_merge.party_merge_view', [
Button('Cancel', 'end', 'tryton-cancel'),
Button('OK', 'result', 'tryton-ok', default=True),
]
)
result = StateTransition()
def default_merge(self, fields):
return {
'duplicates': Transaction().context['active_ids'],
}
def transition_result(self):
for party in self.merge.duplicates:
party.merge_into(self.merge.target)
return 'end'