DBus testing: add unit tests for status and progress

Implement 3 unit tests for status and 1 unit test for
progress. For status,
1) test a full status list when a sync is executed
This is checked in TestSessionAPIsReal.testSync
2) test status when session is aborted
3) test status when session is suspended
For progress, test progress number is increasing
when a sync is executed. Also combine this check in
TestSessionAPIsReal.testSync

It is necessary to specify own signal receivers in
subclasses of DBusUtil to let them add new operations.
Two methods "progressChanged" and "statusChanged" are
added in DBusUtil. They are called when handling
statusChanged and progressChanged signals in setUpListeners.
By checking progress in "progressChanged", we can
make sure that aborting or suspending takes place when
sync is running.
This commit is contained in:
Zhu, Yongsheng 2009-12-01 10:15:20 +08:00
parent d7d67621a9
commit 732bd2d586

View file

@ -393,6 +393,16 @@ class DBusUtil(Timeout):
"""stores ready session in self.sessionpath and self.session"""
self.sessionpath, self.session = self.createSession(config, True)
def progressChanged(self, *args):
'''subclasses override this method to write specified callbacks for ProgressChanged signals
It is called by progress signal receivers in setUpListeners'''
pass
def statusChanged(self, *args):
'''subclasses override this method to write specified callbacks for StatusChanged signals
It is called by status signal receivers in setUpListeners'''
pass
def setUpListeners(self, sessionpath):
"""records progress and status changes in DBusUtil.events and
quits the main loop when the session is done"""
@ -400,10 +410,12 @@ class DBusUtil(Timeout):
def progress(*args):
if self.running:
DBusUtil.events.append(("progress", args))
self.progressChanged(args)
def status(*args):
if self.running:
DBusUtil.events.append(("status", args))
self.statusChanged(args)
if args[0] == "done":
if sessionpath:
DBusUtil.quit_events.append("session " + sessionpath + " done")
@ -1018,6 +1030,7 @@ class TestSessionAPIsReal(unittest.TestCase, DBusUtil):
def setUp(self):
self.setUpServer()
self.setUpSession(config)
self.operation = ""
def run(self, result):
self.runTest(result, own_xdg=False)
@ -1028,14 +1041,88 @@ class TestSessionAPIsReal(unittest.TestCase, DBusUtil):
loop.run()
self.failUnlessEqual(DBusUtil.quit_events, ["session " + self.sessionpath + " done"])
def progressChanged(self, *args):
# subclass specifies its own callback for ProgressChanged signals
percentage = args[0]
# make sure sync is really running
if percentage > 20:
if self.operation == "abort":
self.session.Abort()
if self.operation == "suspend":
self.session.Suspend()
@timeout(300)
def testSync(self):
'''run a real sync with default server'''
'''run a real sync with default server and test status list and progress number'''
''' check events list is correct for StatusChanged and ProgressChanged '''
# do sync
self.doSync()
# TODO: check recorded events in DBusUtil.events
# check recorded events in DBusUtil.events, first filter them
statuses = []
progresses = []
# dict is used to maintain status order.
statusPairs = {"": 0, "idle": 1, "running" : 2, "done" : 3}
for item in DBusUtil.events:
if item[0] == "status":
statuses.append(item[1])
elif item[0] == "progress":
progresses.append(item[1])
# check statuses
lastStatus = ""
lastSources = {}
for status, error, sources in statuses:
self.failIf(status == lastStatus and lastSources == sources)
# no error
self.failUnlessEqual(error, 0)
# keep order: session status must be unchanged or the next status
self.failUnless(statusPairs.has_key(status))
self.failUnless(statusPairs[status] >= statusPairs[lastStatus])
for sourcename, value in sources.items():
# no error
self.failUnlessEqual(value[2], 0)
# keep order: source status must also be unchanged or the next status
if lastSources.has_key(sourcename):
lastValue = lastSources[sourcename]
self.failUnless(statusPairs[value[1]] >= statusPairs[lastValue[1]])
lastStatus = status
lastSources = sources
# check increasing progress percentage
lastPercent = 0
for percent, sources in progresses:
self.failIf(percent < lastPercent)
lastPercent = percent
status, error, sources = self.session.GetStatus(utf8_strings=True)
self.failUnlessEqual(status, "done")
self.failUnlessEqual(error, 0)
@timeout(300)
def testSyncStatusAbort(self):
''' test status is set correctly when the session is aborted '''
self.operation = "abort"
self.doSync()
hasAbortingStatus = False
for item in DBusUtil.events:
if item[0] == "status" and item[1][0] == "aborting":
hasAbortingStatus = True
break
self.failUnlessEqual(hasAbortingStatus, True)
@timeout(300)
def testSyncStatusSuspend(self):
''' test status is set correctly when the session is suspended '''
self.operation = "suspend"
self.doSync()
hasSuspendingStatus = False
for item in DBusUtil.events:
if item[0] == "status" and item[1][0] == "suspending":
hasSuspendingStatus = True
break
self.failUnlessEqual(hasSuspendingStatus, True)
@timeout(300)
def testSyncSecondSession(self):