syncevo-dbus-server: shut down after on-disk changes are observed (BMC #14955)

syncevo-dbus-server must restart after its installation was updated or removed.
Otherwise further sync attempts can fail. This was seen in practice when
SyncEvolution 1.0 was updated to 1.1 (Debian bug #617320): the in-memory daemon
used an old libsynthesis, but the on-disk XML files required more recent
libsynthesis features.

In general, *any* update of something loaded into memory should
trigger a shutdown or restart. A shutdown alone is okay when no
automatic sync scheduling is needed (auto sync off for all
configs). Clients will restart the daemon on demand. A restart is
needed otherwise because without it, automatic syncs would stop to
run.

This patch implements the shutdown part. Restart still needs to be
implemented. A 10 second delay is chosen between "file modified" and
"shut down". This is meant to ensure that a future restart does not
occur too soon (before all file changes are done). It's still a bit
racy, but a better solution would depend integration into
distro-specific hooks ("package update complete"), which is hard or
impossible (installation via "make install" or "tar xf").

This new feature is tested by test-dbus.py, including several corner
cases:
- testShutdown: files modified in regular intervals for a while
- testSession: a running session prevents the shutdown
- testSession2: same as testSession, with different timing
This commit is contained in:
Patrick Ohly 2011-03-28 13:52:28 +02:00
parent 6e071edff3
commit 23464d616a
2 changed files with 299 additions and 22 deletions

View File

@ -41,6 +41,7 @@ static DBusMessage *SyncEvoHandleException(DBusMessage *msg);
#include <syncevo/SyncML.h>
#include <syncevo/FileConfigNode.h>
#include <syncevo/Cmdline.h>
#include <syncevo/GLibSupport.h>
#include <synthesis/san.h>
@ -58,6 +59,7 @@ static DBusMessage *SyncEvoHandleException(DBusMessage *msg);
#include <iostream>
#include <limits>
#include <cmath>
#include <fstream>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/noncopyable.hpp>
@ -222,6 +224,8 @@ public:
void activate(int seconds,
const boost::function<bool ()> &callback)
{
deactivate();
m_callback = callback;
m_tag = g_timeout_add_seconds(seconds, triggered, static_cast<gpointer>(this));
if (!m_tag) {
@ -1091,6 +1095,25 @@ class DBusServer : public DBusObjectHelper,
typedef std::list< std::pair< boost::shared_ptr<Watch>, boost::shared_ptr<Client> > > Clients_t;
Clients_t m_clients;
/**
* Watch all files mapped into our address space. When
* modifications are seen (as during a package upgrade), queue a
* high priority session. This prevents running other sessions,
* which might not be able to execute correctly. For example, a
* sync with libsynthesis from 1.1 does not work with
* SyncEvolution XML files from 1.2. The dummy session then waits
* for the changes to settle (see SHUTDOWN_QUIESENCE_SECONDS) and
* either shuts down or restarts. The latter is necessary if the
* daemon has automatic syncing enabled in a config.
*/
list< boost::shared_ptr<GLibNotify> > m_files;
void fileModified();
/**
* session handling the shutdown in response to file modifications
*/
boost::shared_ptr<Session> m_shutdownSession;
/* Event source that regurally pool network manager
* */
GLibEvent m_pollConnman;
@ -1454,6 +1477,19 @@ public:
*/
std::string getNextSession();
/**
* Number of seconds to wait after file modifications are observed
* before shutting down or restarting. Shutting down could be done
* immediately, but restarting might not work right away. 10
* seconds was chosen because every single package is expected to
* be upgraded on disk in that interval. If a long-running system
* upgrade replaces additional packages later, then the server
* might restart multiple times during a system upgrade. Because it
* never runs operations directly after starting, that shouldn't
* be a problem.
*/
static const int SHUTDOWN_QUIESENCE_SECONDS = 10;
AutoSyncManager &getAutoSyncManager() { return m_autoSync; }
/**
@ -1979,11 +2015,15 @@ class Session : public DBusObjectHelper,
/** the number of sources that have been restored */
int m_restoreSrcEnd;
/**
* status of the session
*/
enum RunOperation {
OP_SYNC = 0,
OP_RESTORE = 1,
OP_CMDLINE = 2,
OP_NULL
OP_SYNC, /**< running a sync */
OP_RESTORE, /**< restoring data */
OP_CMDLINE, /**< executing command line */
OP_SHUTDOWN, /**< will shutdown server as soon as possible */
OP_NULL /**< idle, accepting commands via D-Bus */
};
static string runOpToString(RunOperation op);
@ -1996,6 +2036,26 @@ class Session : public DBusObjectHelper,
/** Cmdline to execute command line args */
boost::shared_ptr<CmdlineWrapper> m_cmdline;
/**
* time of latest file modification relevant for shutdown
*/
Timespec m_shutdownLastMod;
/**
* timer which counts seconds until server is meant to shut down:
* set only while the session is active and thus shutdown is allowed
*/
Timeout m_shutdownTimer;
/**
* Called Server::SHUTDOWN_QUIESENCE_SECONDS after last file modification,
* while shutdown session is active and thus ready to shut down the server.
* Then either triggers the shutdown or restarts.
*
* @return always false to disable timer
*/
bool shutdownServer();
/** Session.Attach() */
void attach(const Caller_t &caller);
@ -2080,7 +2140,8 @@ public:
PRI_CMDLINE = -10,
PRI_DEFAULT = 0,
PRI_CONNECTION = 10,
PRI_AUTOSYNC = 20
PRI_AUTOSYNC = 20,
PRI_SHUTDOWN = 256 // always higher than anything else
};
/**
@ -2089,6 +2150,20 @@ public:
void setPriority(int priority) { m_priority = priority; }
int getPriority() const { return m_priority; }
/**
* Turns session into one which will shut down the server, must
* be called before enqueing it. Will wait for a certain idle period
* after file modifications before claiming to be ready for running
* (see Server::SHUTDOWN_QUIESENCE_SECONDS).
*/
void startShutdown();
/**
* Called by server to tell shutdown session that a file was modified.
* Session uses that to determine when the quiesence period is over.
*/
void shutdownFileModified();
void initServer(SharedBuffer data, const std::string &messageType);
void setConnection(const boost::shared_ptr<Connection> c) { m_connection = c; m_useConnection = c; }
boost::weak_ptr<Connection> getConnection() { return m_connection; }
@ -3597,8 +3672,48 @@ Session::~Session()
done();
}
void Session::startShutdown()
{
m_runOperation = OP_SHUTDOWN;
}
void Session::shutdownFileModified()
{
m_shutdownLastMod = Timespec::monotonic();
SE_LOG_DEBUG(NULL, NULL, "file modified at %lu.%09lus, %s",
(unsigned long)m_shutdownLastMod.tv_sec,
(unsigned long)m_shutdownLastMod.tv_nsec,
m_active ? "active" : "not active");
if (m_active) {
// (re)set shutdown timer: once it fires, we are ready to shut down;
// brute-force approach, will reset timer many times
m_shutdownTimer.activate(DBusServer::SHUTDOWN_QUIESENCE_SECONDS,
boost::bind(&Session::shutdownServer, this));
}
}
bool Session::shutdownServer()
{
Timespec now = Timespec::monotonic();
SE_LOG_DEBUG(NULL, NULL, "shut down server at %lu.%09lu because of file modifications",
now.tv_sec, now.tv_nsec);
if (false /* restart? */) {
// TODO: suitable exec() call which restarts the server using the same environment it was in
// when it was started
} else {
// leave server now
shutdownRequested = true;
g_main_loop_quit(loop);
SE_LOG_INFO(NULL, NULL, "server shutting down because files loaded into memory were modified on disk");
}
return false;
}
void Session::setActive(bool active)
{
bool oldActive = m_active;
m_active = active;
if (active) {
if (m_syncStatus == SYNC_QUEUEING) {
@ -3610,6 +3725,29 @@ void Session::setActive(bool active)
if (c) {
c->ready();
}
if (!oldActive &&
m_runOperation == OP_SHUTDOWN) {
// shutdown session activated: check if or when we can shut down
if (m_shutdownLastMod) {
Timespec now = Timespec::monotonic();
SE_LOG_DEBUG(NULL, NULL, "latest file modified at %lu.%09lus, now is %lu.%09lus",
(unsigned long)m_shutdownLastMod.tv_sec,
(unsigned long)m_shutdownLastMod.tv_nsec,
(unsigned long)now.tv_sec,
(unsigned long)now.tv_nsec);
if (m_shutdownLastMod + DBusServer::SHUTDOWN_QUIESENCE_SECONDS <= now) {
// ready to shutdown immediately
shutdownServer();
} else {
// need to wait
int secs = DBusServer::SHUTDOWN_QUIESENCE_SECONDS -
(now - m_shutdownLastMod).tv_sec;
SE_LOG_DEBUG(NULL, NULL, "shut down in %ds", secs);
m_shutdownTimer.activate(secs, boost::bind(&Session::shutdownServer, this));
}
}
}
}
}
@ -3792,6 +3930,13 @@ void Session::run()
}
m_setConfig = m_cmdline->configWasModified();
break;
case OP_SHUTDOWN:
// block until time for shutdown or restart if no
// shutdown requested already
if (!shutdownRequested) {
g_main_loop_run(loop);
}
break;
default:
break;
};
@ -5319,8 +5464,59 @@ DBusServer::~DBusServer()
LoggerBase::popLogger();
}
void DBusServer::fileModified()
{
if (!m_shutdownSession) {
string newSession = getNextSession();
vector<string> flags;
flags.push_back("no-sync");
m_shutdownSession = Session::createSession(*this,
"", "",
newSession,
flags);
m_shutdownSession->setPriority(Session::PRI_AUTOSYNC);
m_shutdownSession->startShutdown();
enqueue(m_shutdownSession);
}
m_shutdownSession->shutdownFileModified();
}
void DBusServer::run()
{
// This has the intended side effect that it loads everything into
// memory which might be dynamically loadable, like backend
// plugins.
StringMap map = getVersions();
SE_LOG_DEBUG(NULL, NULL, "D-Bus server ready to run, versions:");
BOOST_FOREACH(const StringPair &entry, map) {
SE_LOG_DEBUG(NULL, NULL, "%s: %s", entry.first.c_str(), entry.second.c_str());
}
// Now that everything is loaded, check memory map for files which we have to monitor.
set<string> files;
ifstream in("/proc/self/maps");
while (!in.eof()) {
string line;
getline(in, line);
size_t off = line.find('/');
if (off != line.npos &&
line.find(" r-xp ") != line.npos) {
files.insert(line.substr(off));
}
}
in.close();
BOOST_FOREACH(const string &file, files) {
try {
SE_LOG_DEBUG(NULL, NULL, "watching: %s", file.c_str());
boost::shared_ptr<GLibNotify> notify(new GLibNotify(file.c_str(), boost::bind(&DBusServer::fileModified, this)));
m_files.push_back(notify);
} catch (...) {
// ignore errors for indidividual files
Exception::handle();
}
}
while (!shutdownRequested) {
if (!m_activeSession ||
!m_activeSession->readyToRun()) {
@ -5345,9 +5541,9 @@ void DBusServer::run()
}
session.swap(m_syncSession);
dequeue(session.get());
}
}
if (m_autoSync.hasTask()) {
if (!shutdownRequested && m_autoSync.hasTask()) {
// if there is at least one pending task and no session is created for auto sync,
// pick one task and create a session
m_autoSync.startTask();
@ -5356,7 +5552,7 @@ void DBusServer::run()
// Otherwise activeSession is owned by AutoSyncManager but it never
// be ready to run. Because methods of Session, like 'sync', are able to be
// called when it is active.
if (m_autoSync.hasActiveSession())
if (!shutdownRequested && m_autoSync.hasActiveSession())
{
// if the autosync is the active session, then invoke 'sync'
// to make it ready to run
@ -6434,7 +6630,9 @@ int main(int argc, char **argv)
redirectPtr = &redirect;
// make daemon less chatty - long term this should be a command line option
LoggerBase::instance().setLevel(LoggerBase::INFO);
LoggerBase::instance().setLevel(getenv("SYNCEVOLUTION_DEBUG") ?
LoggerBase::DEBUG :
LoggerBase::INFO);
DBusErrorCXX err;
DBusConnectionPtr conn = b_dbus_setup_bus(DBUS_BUS_SESSION,

View File

@ -32,6 +32,7 @@ from dbus.mainloop.glib import DBusGMainLoop
import dbus.service
import gobject
import sys
import re
# introduced in python-gobject 2.16, not available
# on all Linux distros => make it optional
@ -237,6 +238,7 @@ class DBusUtil(Timeout):
events = []
quit_events = []
reply = None
pserver = None
def getTestProperty(self, key, default):
"""retrieve values set with @property()"""
@ -292,8 +294,8 @@ class DBusUtil(Timeout):
if debugger:
print "\n%s: %s\n" % (self.id(), self.shortDescription())
pserver = subprocess.Popen([debugger] + server,
env=env)
DBusUtil.pserver = subprocess.Popen([debugger] + server,
env=env)
while True:
check = subprocess.Popen("ps x | grep %s | grep -w -v -e %s -e grep -e ps" % \
@ -308,10 +310,10 @@ class DBusUtil(Timeout):
time.sleep(2)
break
else:
pserver = subprocess.Popen(server + serverArgs,
env=env,
stdout=open(syncevolog, "w"),
stderr=subprocess.STDOUT)
DBusUtil.pserver = subprocess.Popen(server + serverArgs,
env=env,
stdout=open(syncevolog, "w"),
stderr=subprocess.STDOUT)
while os.path.getsize(syncevolog) == 0:
time.sleep(1)
@ -347,11 +349,11 @@ class DBusUtil(Timeout):
print "\ndone, quit gdb now\n"
hasfailed = numerrors + numfailures != len(result.errors) + len(result.failures)
if not debugger:
os.kill(pserver.pid, signal.SIGTERM)
pserver.communicate()
if not debugger and DBusUtil.pserver.poll() == None:
os.kill(DBusUtil.pserver.pid, signal.SIGTERM)
DBusUtil.pserver.communicate()
serverout = open(syncevolog).read()
if pserver.returncode and pserver.returncode != -15:
if DBusUtil.pserver.returncode and pserver.returncode != -15:
hasfailed = True
if hasfailed:
# give D-Bus time to settle down
@ -361,10 +363,10 @@ class DBusUtil(Timeout):
monitorout = open(dbuslog).read()
report = "\n\nD-Bus traffic:\n%s\n\nserver output:\n%s\n" % \
(monitorout, serverout)
if pserver.returncode and pserver.returncode != -15:
if DBusUtil.pserver.returncode and DBusUtil.pserver.returncode != -15:
# create a new failure specifically for the server
result.errors.append((self,
"server terminated with error code %d%s" % (pserver.returncode, report)))
"server terminated with error code %d%s" % (DBusUtil.pserver.returncode, report)))
elif numerrors != len(result.errors):
# append report to last error
result.errors[-1] = (result.errors[-1][0], result.errors[-1][1] + report)
@ -372,6 +374,21 @@ class DBusUtil(Timeout):
# same for failure
result.failures[-1] = (result.failures[-1][0], result.failures[-1][1] + report)
def isServerRunning(self):
"""True while the syncevo-dbus-server executable is still running"""
return DBusUtil.pserver and DBusUtil.pserver.poll() == None
def serverExecutable(self):
"""returns full path of currently running syncevo-dbus-server binary"""
self.failUnless(self.isServerRunning())
maps = open("/proc/%d/maps" % DBusUtil.pserver.pid, "r")
regex = re.compile(r'[0-9a-f]*-[0-9a-f]* r-xp [0-9a-f]* [^ ]* \d* *(.*)\n')
for line in maps:
match = regex.match(line)
if match:
return match.group(1)
self.fail("no executable found")
def setUpServer(self):
self.server = dbus.Interface(bus.get_object('org.syncevolution',
'/org/syncevolution/Server'),
@ -2514,7 +2531,69 @@ END:VCARD''')
def run(self, result):
self.runTest(result)
class TestFileNotify(unittest.TestCase, DBusUtil):
"""syncevo-dbus-server must stop if one of its files mapped into
memory (executable, libraries) change. Furthermore it must restart
if automatic syncs are enabled. This class simulates such file changes
by starting the server, identifying the location of the main executable,
and renaming it back and forth."""
def setUp(self):
self.setUpServer()
self.serverexe = self.serverExecutable()
def tearDown(self):
if os.path.isfile(self.serverexe + ".bak"):
os.rename(self.serverexe + ".bak", self.serverexe)
def run(self, result):
self.runTest(result)
def modifyServerFile(self):
"""rename server executable to trigger shutdown"""
os.rename(self.serverexe, self.serverexe + ".bak")
os.rename(self.serverexe + ".bak", self.serverexe)
@timeout(100)
def testShutdown(self):
"""update server binary for 30 seconds, check that it shuts down at most 15 seconds after last mod"""
self.failUnless(self.isServerRunning())
i = 0
# Server must not shut down immediately, more changes might follow.
# Simulate that.
while i < 6:
self.modifyServerFile()
time.sleep(5)
i = i + 1
self.failUnless(self.isServerRunning())
time.sleep(10)
self.failIf(self.isServerRunning())
@timeout(30)
def testSession(self):
"""create session, shut down directly after closing it"""
self.failUnless(self.isServerRunning())
self.setUpSession("")
self.modifyServerFile()
time.sleep(15)
self.failUnless(self.isServerRunning())
self.session.Detach()
# should shut down almost immediately
time.sleep(1)
self.failIf(self.isServerRunning())
@timeout(30)
def testSession2(self):
"""create session, shut down after quiesence period after closing it"""
self.failUnless(self.isServerRunning())
self.setUpSession("")
self.modifyServerFile()
self.failUnless(self.isServerRunning())
self.session.Detach()
time.sleep(8)
self.failUnless(self.isServerRunning())
time.sleep(4)
self.failIf(self.isServerRunning())
if __name__ == '__main__':
unittest.main()