syncevo-dbus-server: kill old session(s) when the same client connects again (MB#7710)

The server now checks whether it has other sessions active or in the
queue with the same device ID and kills the older sessions when adding
a new one. Because it does this on each Process(), it is unlikely
(impossible?!) to to have more than one older session, but because the
session might be running or still inactive in the queue, both are
checked.

As part of testing with test-dbus.py it was observed that a connection
that was to be aborted was also closed normally first. That was because
the sync engine did not set an error status in this case. Better check
for abort requests explicitly before calling agent->shutdown(). However,
this should be fixed as part of the server progress handling (MB#7709).
This commit is contained in:
Patrick Ohly 2009-11-09 21:10:10 +01:00
parent a20aa47c29
commit 2a8cd7396f
3 changed files with 169 additions and 9 deletions

View File

@ -342,6 +342,13 @@ public:
*/
void enqueue(const boost::shared_ptr<Session> &session);
/**
* Remove all sessions with this device ID from the
* queue. If the active session also has this ID,
* the session will be aborted and/or deactivated.
*/
int killSessions(const std::string &peerDeviceID);
/**
* Remove a session from the work queue. If it is running a sync,
* it will keep running and nothing will change. Otherwise, if it
@ -749,6 +756,7 @@ class Session : public DBusObjectHelper,
DBusServer &m_server;
const std::string m_sessionID;
ReadOperations m_operations;
std::string m_peerDeviceID;
bool m_serverMode;
SharedBuffer m_initialMessage;
@ -849,6 +857,7 @@ class Session : public DBusObjectHelper,
public:
Session(DBusServer &server,
const std::string &peerDeviceID,
const std::string &config_name,
const std::string &session);
~Session();
@ -887,6 +896,8 @@ public:
DBusServer &getServer() { return m_server; }
std::string getConfigName() { return m_operations.m_configName; }
std::string getSessionID() const { return m_sessionID; }
std::string getPeerDeviceID() const { return m_peerDeviceID; }
/**
* TRUE if the session is ready to take over control
@ -1590,6 +1601,7 @@ string Session::syncStatusToString(SyncStatus state)
}
Session::Session(DBusServer &server,
const std::string &peerDeviceID,
const std::string &config_name,
const std::string &session) :
DBusObjectHelper(server.getConnection(),
@ -1598,6 +1610,7 @@ Session::Session(DBusServer &server,
m_server(server),
m_sessionID(session),
m_operations(config_name),
m_peerDeviceID(peerDeviceID),
m_serverMode(false),
m_useConnection(false),
m_active(false),
@ -2024,9 +2037,10 @@ void Connection::process(const Caller_t &caller,
const std::pair<size_t, const uint8_t *> &message,
const std::string &message_type)
{
SE_LOG_DEBUG(NULL, NULL, "D-Bus client %s sends %lu bytes, %s",
SE_LOG_DEBUG(NULL, NULL, "D-Bus client %s sends %lu bytes via connection %s, %s",
caller.c_str(),
message.first,
getPath(),
message_type.c_str());
boost::shared_ptr<Client> client(m_server.findClient(caller));
@ -2045,6 +2059,7 @@ void Connection::process(const Caller_t &caller,
switch (m_state) {
case SETUP: {
std::string config;
std::string peerDeviceID;
bool serverMode = false;
// check message type, determine whether we act
// as client or server, choose config
@ -2198,13 +2213,18 @@ void Connection::process(const Caller_t &caller,
throw runtime_error(string("no configuration found for ") +
info.toString());
}
// abort previous session of this client
m_server.killSessions(info.m_deviceID);
peerDeviceID = info.m_deviceID;
} else {
throw runtime_error("message type not supported for starting a sync");
}
// run session as client (server not supported yet)
// run session as client or server
m_state = PROCESSING;
m_session.reset(new Session(m_server,
peerDeviceID,
config,
m_sessionID));
if (serverMode) {
@ -2258,8 +2278,9 @@ void Connection::close(const Caller_t &caller,
bool normal,
const std::string &error)
{
SE_LOG_DEBUG(NULL, NULL, "D-Bus client %s closes %s%s%s",
SE_LOG_DEBUG(NULL, NULL, "D-Bus client %s closes connection %s %s%s%s",
caller.c_str(),
getPath(),
normal ? "normally" : "with error",
error.empty() ? "" : ": ",
error.c_str());
@ -2559,8 +2580,9 @@ void DBusServer::connect(const Caller_t &caller,
new_session,
peer,
must_authenticate));
SE_LOG_DEBUG(NULL, NULL, "connecting D-Bus client %s with '%s'",
SE_LOG_DEBUG(NULL, NULL, "connecting D-Bus client %s with connection %s '%s'",
caller.c_str(),
c->getPath(),
c->m_description.c_str());
boost::shared_ptr<Client> client = addClient(getConnection(),
@ -2582,6 +2604,7 @@ void DBusServer::startSession(const Caller_t &caller,
watch);
std::string new_session = getNextSession();
boost::shared_ptr<Session> session(new Session(*this,
"is this a client or server session?",
server,
new_session));
// TODO: how do we decide whether this is a client or server session?
@ -2715,6 +2738,48 @@ void DBusServer::enqueue(const boost::shared_ptr<Session> &session)
checkQueue();
}
int DBusServer::killSessions(const std::string &peerDeviceID)
{
int count = 0;
WorkQueue_t::iterator it = m_workQueue.begin();
while (it != m_workQueue.end()) {
boost::shared_ptr<Session> session = it->lock();
if (session && session->getPeerDeviceID() == peerDeviceID) {
SE_LOG_DEBUG(NULL, NULL, "removing pending session %s because it matches deviceID %s",
session->getSessionID().c_str(),
peerDeviceID.c_str());
// remove session and its corresponding connection
boost::shared_ptr<Connection> c = session->getConnection().lock();
if (c) {
c->shutdown();
}
it = m_workQueue.erase(it);
count++;
} else {
++it;
}
}
if (m_activeSession &&
m_activeSession->getPeerDeviceID() == peerDeviceID) {
SE_LOG_DEBUG(NULL, NULL, "aborting active session %s because it matches deviceID %s",
m_activeSession->getSessionID().c_str(),
peerDeviceID.c_str());
try {
// abort, even if not necessary right now
m_activeSession->abort();
} catch (...) {
// TODO: catch only that exception which indicates
// incorrect use of the function
}
dequeue(m_activeSession);
count++;
}
return count;
}
void DBusServer::dequeue(Session *session)
{
if (m_syncSession.get() == session) {

View File

@ -2227,7 +2227,7 @@ SyncMLStatus SyncContext::doSync()
// If we get here without error, then close down connection normally.
// Otherwise destruct the agent without further communication.
if (!status) {
if (!status && !checkForAbort()) {
try {
agent->shutdown();
// TODO: implement timeout for peers which fail to respond

View File

@ -54,6 +54,7 @@ class DBusUtil:
def __init__(self):
self.events = []
self.quit_events = []
self.reply = None
def runTest(self, result, own_xdg=True):
@ -174,6 +175,10 @@ class DBusUtil:
def status(*args):
self.events.append(("status", args))
if args[0] == "done":
if sessionpath:
self.quit_events.append("session " + sessionpath + " done")
else:
self.quit_events.append("session done")
loop.quit()
bus.add_signal_receiver(progress,
'ProgressChanged',
@ -195,9 +200,15 @@ class DBusUtil:
getting an abort"""
def abort():
self.events.append(("abort",))
self.quit_events.append("connection " + conpath + " aborted")
loop.quit()
def reply(*args):
self.reply = args
if args[3]:
self.quit_events.append("connection " + conpath + " got final reply")
else:
self.quit_events.append("connection " + conpath + " got reply")
loop.quit()
bus.add_signal_receiver(abort,
'Abort',
@ -359,6 +370,8 @@ class TestConnection(unittest.TestCase, DBusUtil):
conpath, connection = self.getConnection()
connection.Process(TestConnection.message1, 'application/vnd.syncml+xml')
loop.run()
self.failUnlessEqual(self.quit_events, ["connection " + conpath + " got reply"])
self.quit_events = []
# TODO: check events
self.failIfEqual(self.reply, None)
self.failUnlessEqual(self.reply[1], 'application/vnd.syncml+xml')
@ -366,10 +379,92 @@ class TestConnection(unittest.TestCase, DBusUtil):
self.failIfEqual(self.reply[4], '')
connection.Close(False, 'good bye')
loop.run()
# TODO: this shouldn't be necessary, but somehow the
# syncevo-dbus-server keeps running unless we give it some time.
# Incomplete handling of SIGTERM?! See Bugzilla #7555.
time.sleep(5)
loop.run()
self.failUnlessEqual(self.quit_events, ["connection " + conpath + " aborted",
"session done"])
def testStartSyncTwice(self):
"""send the same SyncML message twice, starting two sessions"""
conpath, connection = self.getConnection()
connection.Process(TestConnection.message1, 'application/vnd.syncml+xml')
loop.run()
# TODO: check events
self.failUnlessEqual(self.quit_events, ["connection " + conpath + " got reply"])
self.failIfEqual(self.reply, None)
self.failUnlessEqual(self.reply[1], 'application/vnd.syncml+xml')
self.failUnlessEqual(self.reply[3], False)
self.failIfEqual(self.reply[4], '')
self.reply = None
self.quit_events = []
# Now start another session with the same client *without*
# closing the first one. The server should detect this
# and forcefully close the first one.
conpath2, connection2 = self.getConnection()
connection2.Process(TestConnection.message1, 'application/vnd.syncml+xml')
# reasons for leaving the loop, in random order:
# - abort of first connection
# - first session done
# - reply for second one
loop.run()
loop.run()
loop.run()
self.quit_events.sort()
expected = [ "connection " + conpath + " aborted",
"session done",
"connection " + conpath2 + " got reply" ]
expected.sort()
self.failUnlessEqual(self.quit_events, expected)
self.failIfEqual(self.reply, None)
self.failUnlessEqual(self.reply[1], 'application/vnd.syncml+xml')
self.failUnlessEqual(self.reply[3], False)
self.failIfEqual(self.reply[4], '')
self.quit_events = []
# now quit for good
connection2.Close(False, 'good bye')
loop.run()
loop.run()
self.failUnlessEqual(self.quit_events, ["connection " + conpath2 + " aborted",
"session done"])
def testKillInactive(self):
"""block server with client A, then let client B connect twice"""
conpath, connection = self.getConnection()
connection.Process(TestConnection.message1, 'application/vnd.syncml+xml')
loop.run()
# TODO: check events
self.failUnlessEqual(self.quit_events, ["connection " + conpath + " got reply"])
self.failIfEqual(self.reply, None)
self.failUnlessEqual(self.reply[1], 'application/vnd.syncml+xml')
self.failUnlessEqual(self.reply[3], False)
self.failIfEqual(self.reply[4], '')
self.reply = None
self.quit_events = []
# Now start two more sessions with the second client *without*
# closing the first one. The server should remove only the
# first connection of client B.
message1_clientB = TestConnection.message1.replace("sc-api-nat", "sc-pim-ppc")
conpath2, connection2 = self.getConnection()
connection2.Process(message1_clientB, 'application/vnd.syncml+xml')
conpath3, connection3 = self.getConnection()
connection3.Process(message1_clientB, 'application/vnd.syncml+xml')
loop.run()
self.failUnlessEqual(self.quit_events, [ "connection " + conpath2 + " aborted" ])
self.quit_events = []
# now quit for good
connection3.Close(False, 'good bye client B')
loop.run()
self.failUnlessEqual(self.quit_events, [ "connection " + conpath3 + " aborted" ])
self.quit_events = []
connection.Close(False, 'good bye client A')
loop.run()
loop.run()
self.failUnlessEqual(self.quit_events, ["connection " + conpath + " aborted",
"session done"])
if __name__ == '__main__':
unittest.main()