D-Bus testing: added testSyncSecondSession

testSyncSecondSession is similar to testSecondSession: it creates two
sessions and checks that the second one becomes ready once the first
one is done. But instead of detaching from the first session after a
certain delay (as testSecondSession) does, the first session runs
a real sync, which deactivates it when the sync is done.

The DBusUtil.setUpSession() method was refactored so that it can be
used to create multiple sessions.

Remove signal handler in createSession() because callers do not expect
events getting record by the function, should the signal fire later on.
However, if the caller explicitly asks for it by indicating that it
does the waiting itself, then keep the handler in place.
This commit is contained in:
Patrick Ohly 2009-11-19 11:43:12 +01:00
parent 9e3cb6c6a9
commit bd6687bc9b
1 changed files with 48 additions and 16 deletions

View File

@ -347,29 +347,46 @@ class DBusUtil(Timeout):
'/org/syncevolution/Server'),
'org.syncevolution.Server')
def setUpSession(self, config):
self.sessionpath = self.server.StartSession(config)
def createSession(self, config, wait):
"""Return sessionpath and session object for session using 'config'.
A signal handler calls loop.quit() when this session becomes ready.
If wait=True, then this call blocks until the session is ready.
"""
sessionpath = self.server.StartSession(config)
def session_ready(object, ready):
if self.running and ready and object == sessionpath:
DBusUtil.quit_events.append("session " + object + " ready")
loop.quit()
bus.add_signal_receiver(session_ready,
'SessionChanged',
'org.syncevolution.Server',
'org.syncevolution',
None,
byte_arrays=True,
utf8_strings=True)
self.session = dbus.Interface(bus.get_object('org.syncevolution',
self.sessionpath),
'org.syncevolution.Session')
status, error, sources = self.session.GetStatus(utf8_strings=True)
if status == "queuing":
signal = bus.add_signal_receiver(session_ready,
'SessionChanged',
'org.syncevolution.Server',
'org.syncevolution',
None,
byte_arrays=True,
utf8_strings=True)
session = dbus.Interface(bus.get_object('org.syncevolution',
sessionpath),
'org.syncevolution.Session')
status, error, sources = session.GetStatus(utf8_strings=True)
if wait and status == "queuing":
# wait for signal
loop.run()
self.failUnlessEqual(DBusUtil.quit_events, ["session " + self.sessionpath + " ready"])
DBusUtil.quit_events = []
self.failUnlessEqual(DBusUtil.quit_events, ["session " + sessionpath + " ready"])
elif DBusUtil.quit_events:
# signal was processed inside D-Bus call?
self.failUnlessEqual(DBusUtil.quit_events, ["session " + sessionpath + " ready"])
if wait:
# signal no longer needed, remove it because otherwise it
# might record unexpected "session ready" events
signal.remove()
DBusUtil.quit_events = []
return (sessionpath, session)
def setUpSession(self, config):
"""stores ready session in self.sessionpath and self.session"""
self.sessionpath, self.session = self.createSession(config, True)
def setUpListeners(self, sessionpath):
"""records progress and status changes in DBusUtil.events and
@ -854,6 +871,21 @@ class TestSessionAPIsReal(unittest.TestCase, DBusUtil):
self.failUnlessEqual(status, "done")
self.failUnlessEqual(error, 0)
@timeout(300)
def testSyncSecondSession(self):
'''ask for a second session that becomes ready after a real sync'''
sessionpath2, session2 = self.createSession("", False)
status, error, sources = session2.GetStatus(utf8_strings=True)
self.failUnlessEqual(status, "queueing")
self.testSync()
# now wait for second session becoming ready
loop.run()
status, error, sources = session2.GetStatus(utf8_strings=True)
self.failUnlessEqual(status, "idle")
self.failUnlessEqual(DBusUtil.quit_events, ["session " + self.sessionpath + " done",
"session " + sessionpath2 + " ready"])
session2.Detach()
# TODO: don't depend on running a real sync in this test,
# then remove timeout
@timeout(300)