- Add a lock to ensure sync and check are not executed in parallel on the client

- Use a single remote call for synchronization
- Delete all synchro.mapping entries if the client was reset
- Add asserts
- Sync product.uom and other synchronization improvements
This commit is contained in:
Albert Cervera i Areny 2015-02-02 21:52:18 +01:00
parent dd52acfe0a
commit 0c7f115479
1 changed files with 106 additions and 68 deletions

View File

@ -1,7 +1,6 @@
# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
import sys
#import ast
import xmlrpclib
from datetime import datetime
import random
@ -199,6 +198,7 @@ class CheckPlan(ModelSQL, ModelView):
integer_to_create = []
float_to_create = []
char_to_create = []
logging.info('Checking %s' % plan.type.internal_name)
res = getattr(plan, plan.type.internal_name)()
for result in res:
t = ResultType.search([
@ -262,11 +262,10 @@ class CheckPlan(ModelSQL, ModelView):
@classmethod
def check_all(cls):
pool = Pool()
Plan = pool.get('monitoring.check.plan')
Check = pool.get('monitoring.check')
plans = Plan.search([])
Check = Pool().get('monitoring.check')
plans = cls.search([])
to_check = []
Transaction().cursor.lock(cls._table)
for plan in plans:
checks = Check.search([
('plan', '=', plan.id),
@ -280,7 +279,8 @@ class CheckPlan(ModelSQL, ModelView):
if (delta.seconds / 3600.0) >= plan.scheduler.normal_check_interval:
to_check.append(plan)
Plan.check(Plan.browse([x.id for x in to_check]))
cls.check(cls.browse([x.id for x in to_check]))
logging.info('check_all finished')
def get_attribute(self, name):
"""
@ -319,7 +319,6 @@ class StateIndicatorCheckPlan(ModelSQL, ModelView):
@classmethod
def get_lasts(cls, records, names):
import logging
res = {}
for name in ('last_state', 'last_check', 'last_state_type',
'last_state_value', 'color'):
@ -331,7 +330,6 @@ class StateIndicatorCheckPlan(ModelSQL, ModelView):
Plan = Pool().get('monitoring.check.plan')
check_ids = []
mapping = {}
logging.info("Grouping plans")
plan_ids = list(set([x.plan.id for x in records]))
checks = Check.search([
@ -356,11 +354,9 @@ class StateIndicatorCheckPlan(ModelSQL, ModelView):
res['last_check'][record.id] = check_id
check_ids.append(check_id)
logging.info("Mapping ready")
states = State.search([
('check', 'in', check_ids),
])
logging.info("Got states")
for state in states:
key = (state.check.id, state.indicator.id)
if key in mapping:
@ -369,7 +365,6 @@ class StateIndicatorCheckPlan(ModelSQL, ModelView):
res['last_state_value'][mapping[key]] = state.value
res['color'][mapping[key]] = (state.state.color if state.state
else 'black')
logging.info("GOt it")
return res
def get_asset(self, name):
@ -521,8 +516,7 @@ class Asset:
def __setup__(cls):
super(Asset, cls).__setup__()
cls.__rpc__.update({
'update_remote_checks': RPC(readonly=False),
'fetch_remote_assets': RPC(),
'server_sync': RPC(readonly=False),
})
def get_attribute(self, name, browsed=None):
@ -578,13 +572,13 @@ class Asset:
@classmethod
def get_login(cls, login, password):
'''
Return user id if password matches
Return asset if password matches
'''
user_id, password_hash = cls._get_login(login)
if user_id:
if cls.check_password(password, password_hash):
return user_id
return 0
return cls(user_id)
return None
@staticmethod
def hash_method():
@ -668,12 +662,14 @@ class Asset:
# TODO: Reference fields
value = ''
if name in mappings and value:
mapping, = SynchroMapping.search([
db_mappings = SynchroMapping.search([
('local_id', '=', value),
('model', '=', mappings[name]),
('peer', '=', peer),
])
value = mapping.remote_id
assert db_mappings, ('No mappings found with local_id=%s, '
'model=%s, peer=%s' % (value, mappings[name], peer))
value = db_mappings[0].remote_id
res[name] = value
return res
@ -699,12 +695,14 @@ class Asset:
continue
value = overrides.get(name, value)
if name in mappings and value:
mapping, = SynchroMapping.search([
db_mappings = SynchroMapping.search([
('remote_id', '=', value),
('model', '=', mappings[name]),
('peer', '=', peer),
])
value = mapping.local_id
assert db_mappings, ('No mappings found with remote_id=%s, '
'model=%s, peer=%s' % (value, mappings[name], peer))
value = db_mappings[0].local_id
setattr(obj, name, value)
return obj
@ -742,42 +740,37 @@ class Asset:
to_create.append(Asset.dict_to_object(record, cls, peer=peer,
overrides=overrides, mappings=mappings))
new_records.append(record)
new_ids = [r.id for r in cls.create([x._save_values for x in to_create])]
for local_id, remote in izip(new_ids, new_records):
map_records.append({
'local_id': local_id,
'remote_id': remote['id'],
'model': cls.__name__,
'peer': peer,
})
if to_create or new_records:
new_ids = [r.id for r in cls.create([x._save_values for x in to_create])]
for local_id, remote in izip(new_ids, new_records):
map_records.append({
'local_id': local_id,
'remote_id': remote['id'],
'model': cls.__name__,
'peer': peer,
})
else:
new_ids = []
if map_records:
SynchroMapping.create(map_records)
return cls.browse(local_ids + new_ids)
@classmethod
def fetch_remote_assets(cls, login, password):
logging.info('fetch_remote_assets: %s' % login)
def fetch_remote_assets(self):
logging.info('fetch_remote_assets: %s' % self.login)
AssetRelationAll = Pool().get('asset.relation.all')
ResultType = Pool().get('monitoring.result.type')
asset_id = cls.get_login(login, password)
if not asset_id:
logging.getLogger('monitoring').error('No asset found for login %s' %
login)
raise Exception('Incorrect login or password')
asset = cls(asset_id)
products = []
assets = set()
assets.add(asset)
assets.add(self)
plans = []
schedulers = set()
check_types = set()
attribute_sets = set()
if asset.attribute_set:
attribute_sets.add(asset.attribute_set)
for plan in asset.plans:
if self.attribute_set:
attribute_sets.add(self.attribute_set)
for plan in self.plans:
if plan.monitored_asset:
assets.add(plan.monitored_asset)
if plan.monitored_asset.attribute_set:
@ -789,51 +782,74 @@ class Asset:
result_types = ResultType.search([])
data = {}
data['schedulers'] = cls.export_objects(list(schedulers))
data['check_types'] = cls.export_objects(list(check_types),
data['schedulers'] = self.export_objects(list(schedulers))
data['check_types'] = self.export_objects(list(check_types),
model_data=True)
data['result_types'] = cls.export_objects(result_types, model_data=True)
data['plans'] = cls.export_objects(plans)
data['asset_attribute_sets'] = cls.export_objects(list(attribute_sets))
data['assets'] = cls.export_objects(list(assets))
logging.info('fetch_remote_assets: %s finished' % login)
data['result_types'] = self.export_objects(result_types,
model_data=True)
data['plans'] = self.export_objects(plans)
data['asset_attribute_sets'] = self.export_objects(list(attribute_sets))
data['assets'] = self.export_objects(list(assets))
logging.info('fetch_remote_assets: %s finished' % self.login)
return data
@classmethod
def update_remote_checks(cls, login, password, data):
logging.info('update_remote_checks: %s' % login)
if not cls.get_login(login, password):
logging.getLogger('monitoring').error('No asset found for login %s' %
login)
def server_sync(cls, login, password, data, clear):
logging.info('server_sync: %s' % login)
SynchroMapping = Pool().get('synchro.mapping')
asset = cls.get_login(login, password)
if not asset:
logging.getLogger('monitoring').error('No asset found for login '
'%s' % login)
raise Exception('Incorrect login or password')
if clear:
# Should only be removed the first time a new server synchronizes.
# Necessary in case the remote database has been cleared.
SynchroMapping.delete(SynchroMapping.search([
('peer', '=', login),
]))
else:
asset.update_remote_checks(data)
res = asset.fetch_remote_assets()
logging.info('server_sync: %s finished' % login)
return res
def update_remote_checks(self, data):
logging.info('update_remote_checks: %s' % self.login)
pool = Pool()
Check = pool.get('monitoring.check')
IntegerResult = pool.get('monitoring.result.integer')
FloatResult = pool.get('monitoring.result.float')
CharResult = pool.get('monitoring.result.char')
CheckPlan = pool.get('monitoring.check.plan')
SynchroMapping = pool.get('synchro.mapping')
ProductUom = pool.get('product.uom')
if not data['checks']:
return
checks = cls.import_objects(data['checks'], Check, peer=login)
cls.import_objects(data['integer_results'], IntegerResult, peer=login,
mappings={
checks = self.import_objects(data['checks'], Check, peer=self.login)
self.import_objects(data['integer_results'], IntegerResult,
peer=self.login, mappings={
'check': 'monitoring.check',
})
cls.import_objects(data['float_results'], FloatResult, peer=login,
self.import_objects(data['product_uoms'], ProductUom, peer=self.login)
self.import_objects(data['float_results'], FloatResult, peer=self.login,
mappings={
'check': 'monitoring.check',
'uom': 'product.uom',
})
cls.import_objects(data['char_results'], CharResult, peer=login,
self.import_objects(data['char_results'], CharResult, peer=self.login,
mappings={
'check': 'monitoring.check',
})
CheckPlan.create_indicators(Check.browse([x.id for x in checks]))
logging.info('update_remote_checks: %s finished' % login)
logging.info('update_remote_checks: %s finished' % self.login)
@classmethod
def sync(cls):
def client_sync(cls):
logging.info('client_sync')
pool = Pool()
Check = pool.get('monitoring.check')
IntegerResult = pool.get('monitoring.result.integer')
@ -849,28 +865,48 @@ class Asset:
ModelData = pool.get('ir.model.data')
CheckType = pool.get('monitoring.check.type')
ResultType = pool.get('monitoring.result.type')
ProductUom = pool.get('product.uom')
Transaction().cursor.lock(Plan._table)
data = {}
checks = Check.search([])
data['checks'] = cls.export_objects(checks, mappings={
'plan': 'monitoring.check.plan',
'monitoring_asset': 'asset',
'monitored_asset': 'asset',
'type': 'monitoring.check.type',
})
integers = IntegerResult.search([])
data['integer_results'] = cls.export_objects(integers)
data['integer_results'] = cls.export_objects(integers, mappings={
'type': 'monitoring.result.type',
})
product_uoms = ProductUom.search([])
data['product_uoms'] = cls.export_objects(product_uoms,
model_data=True)
floats = FloatResult.search([])
data['float_results'] = cls.export_objects(floats)
data['float_results'] = cls.export_objects(floats, mappings={
'type': 'monitoring.result.type',
})
chars = CharResult.search([])
data['char_results'] = cls.export_objects(chars)
data['char_results'] = cls.export_objects(chars, mappings={
'type': 'monitoring.result.type',
})
uri = config.get('monitoring', 'uri')
username = config.get('monitoring', 'username')
password = config.get('monitoring', 'password')
server = xmlrpclib.ServerProxy(uri, allow_none=True)
context = server.model.res.user.get_preferences(True, {})
server.model.asset.update_remote_checks(username, password, data, context)
data = server.model.asset.fetch_remote_assets(username, password, context)
remote_clear = True
if Asset.search([], limit=1):
# Should only be removed the first time a new server synchronizes.
# Necessary in case the remote database has been cleared.
remote_clear = False
data = server.model.asset.server_sync(username, password, data,
remote_clear, context)
Check.delete(checks)
IntegerResult.delete(checks)
@ -910,13 +946,15 @@ class Asset:
'product': asset_product.id,
})
cls.import_objects(data['schedulers'], Scheduler)
cls.import_objects(data['check_types'], CheckType)
cls.import_objects(data['result_types'], ResultType)
cls.import_objects(data['plans'], Plan, mappings={
'monitoring_asset': 'asset',
'monitored_asset': 'asset',
'scheduler': 'monitoring.scheduler',
'type': 'monitoring.check.type',
})
cls.import_objects(data['check_types'], CheckType)
cls.import_objects(data['result_types'], ResultType)
logging.info('client_sync finished')
class StateTypeParty(ModelSQL):