test-dbus.py: added Connection tests

The new TestConnection suite covers some Connection API related
aspects:
- creating a connection and closing it
- sending an invalid message
- sending a valid message

Right now these tests assume that a "syncevolution_server" config
exists. They depend on getting one Abort signal per connection
to shut down each test.

The signal recording code from TestDBusSync was moved into DBusUtil
so that the different tests can share this utility code.

Some sleeps are necessary after sync sessions because SIGTERM might
get caught without properly shutting down synevo-dbus-server (MB#7555).
This commit is contained in:
Patrick Ohly 2009-11-05 18:04:41 +01:00
parent aa15620845
commit a272d747f9
1 changed files with 119 additions and 22 deletions

View File

@ -52,6 +52,10 @@ class DBusUtil:
"""Contains the common run() method for all D-Bus test suites
and some utility functions."""
def __init__(self):
self.events = []
self.reply = None
def runTest(self, result, own_xdg=True):
"""Starts the D-Bus server and dbus-monitor before the test
itself. After the test run, the output of these two commands
@ -162,6 +166,53 @@ class DBusUtil:
if status == "queuing":
loop.run()
def setUpListeners(self, sessionpath):
"""records progress and status changes in self.events and
quits the main loop when the session is done"""
def progress(*args):
self.events.append(("progress", args))
def status(*args):
self.events.append(("status", args))
if args[0] == "done":
loop.quit()
bus.add_signal_receiver(progress,
'ProgressChanged',
'org.syncevolution.Session',
'org.syncevolution',
sessionpath,
byte_arrays=True,
utf8_strings=True)
bus.add_signal_receiver(status,
'StatusChanged',
'org.syncevolution.Session',
'org.syncevolution',
sessionpath,
byte_arrays=True,
utf8_strings=True)
def setUpConnectionListeners(self, conpath):
"""records connection signals (abort and reply), quits when
getting an abort"""
def abort():
self.events.append(("abort",))
loop.quit()
def reply(*args):
self.reply = args
loop.quit()
bus.add_signal_receiver(abort,
'Abort',
'org.syncevolution.Connection',
'org.syncevolution',
conpath,
byte_arrays=True,
utf8_strings=True)
bus.add_signal_receiver(reply,
'Reply',
'org.syncevolution.Connection',
'org.syncevolution',
conpath,
byte_arrays=True,
utf8_strings=True)
class TestDBusServer(unittest.TestCase, DBusUtil):
"""Tests for the read-only Server API."""
@ -244,6 +295,7 @@ class TestDBusSync(unittest.TestCase, DBusUtil):
"""Executes a real sync."""
def setUp(self):
DBusUtil.__init__(self)
self.setUpServer()
self.setUpSession(config)
@ -251,33 +303,78 @@ class TestDBusSync(unittest.TestCase, DBusUtil):
self.runTest(result, own_xdg=False)
def testSync(self):
events = []
def progress(*args):
events.append(("progress", args))
def status(*args):
events.append(("status", args))
if args[0] == "done":
loop.quit()
bus.add_signal_receiver(progress,
'ProgressChanged',
'org.syncevolution.Session',
'org.syncevolution',
self.sessionpath,
byte_arrays=True,
utf8_strings=True)
bus.add_signal_receiver(status,
'StatusChanged',
'org.syncevolution.Session',
'org.syncevolution',
self.sessionpath,
byte_arrays=True,
utf8_strings=True)
self.setUpListeners(self.sessionpath)
self.session.Sync("", {})
loop.run()
# TODO: check recorded events
# TODO: check recorded events in self.events
status, error, sources = self.session.GetStatus(utf8_strings=True)
self.failUnlessEqual(status, "done")
self.failUnlessEqual(error, 0)
# TODO: this shouldn't be necessary, but somehow the
# syncevo-dbus-server keeps running unless we give it some time.
# Incomplete handling of SIGTERM?! See Bugzilla #7555.
time.sleep(5)
class TestConnection(unittest.TestCase, DBusUtil):
"""Tests Server.Connect(). Tests depend on getting one Abort signal to terminate."""
"""a real message sent to our own server, DevInf stripped"""
message1 = '''<?xml version="1.0" encoding="UTF-8"?><SyncML xmlns='SYNCML:SYNCML1.2'><SyncHdr><VerDTD>1.2</VerDTD><VerProto>SyncML/1.2</VerProto><SessionID>255</SessionID><MsgID>1</MsgID><Target><LocURI>http://127.0.0.1:9000/syncevolution</LocURI></Target><Source><LocURI>sc-pim-117cdd45-3892-4a2d-ae62-4c98cb0f3eae</LocURI><LocName>test</LocName></Source><Cred><Meta><Format xmlns='syncml:metinf'>b64</Format><Type xmlns='syncml:metinf'>syncml:auth-md5</Type></Meta><Data>kHzMn3RWFGWSKeBpXicppQ==</Data></Cred><Meta><MaxMsgSize xmlns='syncml:metinf'>20000</MaxMsgSize><MaxObjSize xmlns='syncml:metinf'>4000000</MaxObjSize></Meta></SyncHdr><SyncBody><Alert><CmdID>1</CmdID><Data>200</Data><Item><Target><LocURI>addressbook</LocURI></Target><Source><LocURI>./addressbook</LocURI></Source><Meta><Anchor xmlns='syncml:metinf'><Last>20091105T092757Z</Last><Next>20091105T092831Z</Next></Anchor><MaxObjSize xmlns='syncml:metinf'>4000000</MaxObjSize></Meta></Item></Alert><Final/></SyncBody></SyncML>'''
def setUp(self):
DBusUtil.__init__(self)
self.setUpServer()
self.setUpListeners(None)
def run(self, result):
self.runTest(result, own_xdg=False)
def getConnection(self):
conpath = self.server.Connect({'description': 'test-dbus.py',
'config': 'syncevolution_server',
'transport': 'dummy'},
False,
"")
self.setUpConnectionListeners(conpath)
connection = dbus.Interface(bus.get_object('org.syncevolution',
conpath),
'org.syncevolution.Connection')
return (conpath, connection)
def testConnect(self):
"""get connection and close it"""
conpath, connection = self.getConnection()
connection.Close(False, 'good bye')
loop.run()
self.failUnlessEqual(self.events, [('abort',)])
def testInvalidConnect(self):
"""get connection, send invalid initial message"""
conpath, connection = self.getConnection()
try:
connection.Process('1234', 'invalid message type')
except dbus.DBusException, ex:
self.failUnlessEqual(str(ex),
'org.syncevolution.Exception: message type not supported for starting a sync')
loop.run()
self.failUnlessEqual(self.events, [('abort',)])
def testStartSync(self):
"""send a valid initial SyncML message"""
conpath, connection = self.getConnection()
connection.Process(TestConnection.message1, 'application/vnd.syncml+xml')
loop.run()
# TODO: check events
self.failIfEqual(self.reply, None)
self.failUnlessEqual(self.reply[1], 'application/vnd.syncml+xml')
self.failUnlessEqual(self.reply[3], False)
self.failIfEqual(self.reply[4], '')
connection.Close(False, 'good bye')
loop.run()
# TODO: this shouldn't be necessary, but somehow the
# syncevo-dbus-server keeps running unless we give it some time.
# Incomplete handling of SIGTERM?! See Bugzilla #7555.
time.sleep(5)
if __name__ == '__main__':
unittest.main()