PIM Testing: cover StopSync()

The test covers aborting a running sync as well as a pending one; both
goes through different code paths.

The asserts only work in unittest from Python >= 2.7. For testing the
PIM Manager that is okay, because it needs a fairly recent software
elsewhere, too. Only test-dbus.py itself must still work with older
Python.
This commit is contained in:
Patrick Ohly 2012-09-20 14:05:28 +02:00
parent 647cb91fd6
commit a044071e0c
2 changed files with 63 additions and 2 deletions

View file

@ -46,8 +46,9 @@ class Manager : public GDBusCXX::DBusObjectHelper
*/
std::set<std::string> m_enabledPeers;
typedef std::list< std::pair< boost::shared_ptr<GDBusCXX::Result>, boost::shared_ptr<Session> > > Pending_t;
/** holds the references to pending session requests, see runInSession() */
std::list< boost::shared_ptr<Session> > m_pending;
Pending_t m_pending;
Manager(const boost::shared_ptr<Server> &server);
void init();

View file

@ -46,7 +46,7 @@ testFolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect
if testFolder not in sys.path:
sys.path.insert(0, testFolder)
from testdbus import DBusUtil, timeout, usingValgrind, xdg_root, bus
from testdbus import DBusUtil, timeout, property, usingValgrind, xdg_root, bus, logging, loop
import testdbus
@ -356,5 +356,65 @@ END:VCARD'''
self.exportCache(uid, export)
self.compareDBs(contacts, export)
@timeout(100)
@property("ENV", "SYNCEVOLUTION_SYNC_DELAY=200")
def testSyncAbort(self):
'''TestContacts.testSyncAbort - test StopSync()'''
self.setUpServer()
sources = self.currentSources()
expected = sources.copy()
peers = {}
# dummy peer directory
contacts = os.path.abspath(os.path.join(xdg_root, 'contacts'))
os.makedirs(contacts)
# add foo
uid = self.uidPrefix + 'foo'
peers[uid] = {'protocol': 'files',
'address': contacts}
self.manager.SetPeer(uid,
peers[uid],
timeout=self.timeout)
expected.add(self.managerPrefix + uid)
self.assertEqual(peers, self.manager.GetAllPeers(timeout=self.timeout))
self.assertEqual(expected, self.currentSources())
# Start a sync. Because of SYNCEVOLUTION_SYNC_DELAY, this will block until
# we kill it.
syncCompleted = [ False, False ]
self.aborted = False
def result(index, res):
syncCompleted[index] = res
def output(path, level, text, procname):
if self.running and not self.aborted and text == 'ready to sync':
logging.printf('aborting sync')
self.manager.StopSync(uid)
self.aborted = True
receiver = bus.add_signal_receiver(output,
'LogOutput',
'org.syncevolution.Server',
self.server.bus_name,
byte_arrays=True,
utf8_strings=True)
try:
self.manager.SyncPeer(uid,
reply_handler=lambda: result(0, True),
error_handler=lambda x: result(0, x))
self.manager.SyncPeer(uid,
reply_handler=lambda: result(1, True),
error_handler=lambda x: result(1, x))
self.runUntil('both syncs done',
check=lambda: True,
until=lambda: not False in syncCompleted)
finally:
receiver.remove()
# Check for specified error.
self.assertIsInstance(syncCompleted[0], dbus.DBusException)
self.assertEqual('org._01.pim.contacts.Manager.Aborted', syncCompleted[0].get_dbus_name())
self.assertIsInstance(syncCompleted[1], dbus.DBusException)
self.assertEqual('org._01.pim.contacts.Manager.Aborted', syncCompleted[1].get_dbus_name())
if __name__ == '__main__':
unittest.main()