PIM Manager: make some of the D-Bus methods thread-safe

This adds the infrastructure for shifting the work of the D-Bus
methods into the main thread if called by another thread, which may
happen when we add other bindings for them.

Work is shifted to the main thread via a GAsyncQueue +
g_main_context_wakeup(). The calling thread then blocks waiting on a
condition variable until signaled by the main thread. Results are
stored for the calling thread as part of the operation that it
provides. Exceptions in the main thread are caught+serialized as
string and deserialized+rethrown in the calling thread - a bit crude,
but should work and reuses existing code.
This commit is contained in:
Patrick Ohly 2013-02-06 10:59:30 +01:00
parent 501c32c06d
commit fecfd3f69d
3 changed files with 144 additions and 1 deletions

View file

@ -76,6 +76,8 @@ Manager::Manager(const boost::shared_ptr<Server> &server) :
DBusObjectHelper(server->getConnection(),
MANAGER_PATH,
MANAGER_IFACE),
m_mainThread(g_thread_self()),
m_taskQueue(GAsyncQueueStealCXX(g_async_queue_new())),
m_server(server),
m_locale(LocaleFactory::createFactory())
{
@ -93,6 +95,8 @@ Manager::~Manager()
void Manager::init()
{
m_taskQueueFlush.activate(-1, boost::bind(&Manager::checkTaskQueueOnIdle, this));
// Restore sort order and active databases.
m_configNode.reset(new IniFileConfigNode(SubstEnvironment("${XDG_CONFIG_HOME}/syncevolution"),
"pim-manager.ini",
@ -140,6 +144,89 @@ void Manager::init()
boost::function<void (bool)>());
}
struct TaskForMain
{
GMutex m_mutex;
GCond m_cond;
bool m_done;
boost::function<void ()> m_operation;
boost::function<void ()> m_rethrow;
};
template <class R> void AssignResult(const boost::function<R ()> &operation,
R &res)
{
res = operation();
}
template <class R> R Manager::runInMainRes(const boost::function<R ()> &operation)
{
// Prepare task.
R res;
TaskForMain task;
g_mutex_init(&task.m_mutex);
g_cond_init(&task.m_cond);
task.m_done = false;
task.m_operation = boost::bind(&AssignResult<R>, boost::cref(operation), boost::ref(res));
// Run in main.
g_async_queue_push(m_taskQueue, &task);
g_main_context_wakeup(NULL);
g_mutex_lock(&task.m_mutex);
while (!task.m_done) {
g_cond_wait(&task.m_cond, &task.m_mutex);
}
g_mutex_unlock(&task.m_mutex);
// Rethrow exceptions (optional) and return result.
g_cond_clear(&task.m_cond);
g_mutex_clear(&task.m_mutex);
if (task.m_rethrow) {
task.m_rethrow();
}
return res;
}
static int Return1(const boost::function<void ()> &operation)
{
operation();
return 1;
}
void Manager::runInMainVoid(const boost::function<void ()> &operation)
{
runInMainRes<int>(boost::bind(Return1, boost::cref(operation)));
}
bool Manager::checkTaskQueueOnIdle()
{
TaskForMain *task;
while ((task = static_cast<TaskForMain *>(g_async_queue_try_pop(m_taskQueue))) != NULL) {
g_mutex_lock(&task->m_mutex);
// Exceptions must be reported back to the original thread.
// This is done by serializing them as string, then using the
// existing Exception::tryRethrow() to turn that string back
// into an instance of the right class.
try {
task->m_operation();
} catch (...) {
std::string explanation;
Exception::handle(explanation);
task->m_rethrow = boost::bind(Exception::tryRethrow, explanation, true);
}
// Wake up task.
task->m_done = true;
g_cond_signal(&task->m_cond);
g_mutex_unlock(&task->m_mutex);
}
// Keep checking.
return true;
}
void Manager::initFolks()
{
m_folks = IndividualAggregator::create(m_locale);
@ -177,6 +264,11 @@ boost::shared_ptr<GDBusCXX::DBusObjectHelper> CreateContactManager(const boost::
void Manager::start()
{
if (!isMain()) {
runInMainV(&Manager::start);
return;
}
if (!m_preventingAutoTerm) {
// Prevent automatic shut down during idle times, because we need
// to keep our unified address book available.
@ -188,6 +280,11 @@ void Manager::start()
void Manager::stop()
{
if (!isMain()) {
runInMainV(&Manager::stop);
return;
}
// If there are no active searches, then recreate aggregator.
// Instead of tracking open views, use the knowledge that an
// idle server has only two references to the main view:
@ -208,11 +305,20 @@ void Manager::stop()
bool Manager::isRunning()
{
if (!isMain()) {
return runInMainR(&Manager::isRunning);
}
return m_folks->isRunning();
}
void Manager::setSortOrder(const std::string &order)
{
if (!isMain()) {
runInMainV(&Manager::setSortOrder, order);
return;
}
if (order == getSortOrder()) {
// Nothing to do.
return;
@ -226,6 +332,15 @@ void Manager::setSortOrder(const std::string &order)
m_sortOrder = order;
}
std::string Manager::getSortOrder()
{
if (!isMain()) {
return runInMainR(&Manager::getSortOrder);
}
return m_sortOrder;
}
/**
* Connects a normal IndividualView to a D-Bus client.
* Provides the org.01.pim.contacts.ViewControl API.
@ -651,6 +766,8 @@ void Manager::search(const boost::shared_ptr< GDBusCXX::Result1<GDBusCXX::DBusOb
const LocaleFactory::Filter_t &filter,
const GDBusCXX::DBusObject_t &agentPath)
{
// TODO: figure out a native, thread-safe API for this.
// Start folks in parallel with asking for an ESourceRegistry.
start();

View file

@ -38,6 +38,9 @@ SE_BEGIN_CXX
*/
class Manager : public GDBusCXX::DBusObjectHelper
{
GThread *m_mainThread;
GAsyncQueueCXX m_taskQueue;
Timeout m_taskQueueFlush;
boost::weak_ptr<Manager> m_self;
boost::shared_ptr<Server> m_server;
boost::shared_ptr<IndividualAggregator> m_folks;
@ -73,7 +76,7 @@ class Manager : public GDBusCXX::DBusObjectHelper
/** Manager.SetSortOrder() */
void setSortOrder(const std::string &order);
/** Manager.GetSortOrder() */
std::string getSortOrder() { return m_sortOrder; }
std::string getSortOrder();
/** Manager.Search() */
void search(const boost::shared_ptr< GDBusCXX::Result1<GDBusCXX::DBusObject_t> > &result,
const GDBusCXX::Caller_t &ID,
@ -173,6 +176,28 @@ class Manager : public GDBusCXX::DBusObjectHelper
const boost::shared_ptr<GDBusCXX::Result> &result,
const boost::function<void (const boost::shared_ptr<Session> &session)> &callback);
/** true if the current thread is the one handling the event loop and running all operations */
bool isMain() { return g_thread_self() == m_mainThread; }
/**
* Runs the operation inside the main thread and returns once the
* main thread is done with it.
*/
void runInMainVoid(const boost::function<void ()> &operation);
template <class R> R runInMainRes(const boost::function<R ()> &operation);
void runInMainV(void (Manager::*method)()) { runInMainVoid(boost::bind(method, this)); }
template <class R> R runInMainR(R (Manager::*method)()) { return runInMainRes<R>(boost::bind(method, this)); }
template <class A1, class B1> void runInMainV(void (Manager::*method)(B1), A1 a1) { runInMainVoid(boost::bind(method, this, a1)); }
template <class R, class A1, class B1> R runInMainR(R (Manager::*method)(B1), A1 a1) { return runInMainRes<R>(boost::bind(method, this, a1)); }
/**
* An idle callback which checks the task queue. runInMainV()
* wakes up the context to ensure that the idle callback is
* invoked.
*/
bool checkTaskQueueOnIdle();
public:
/**
* Creates an instance of the Manager which runs as part

View file

@ -301,6 +301,7 @@ SE_END_CXX
SE_GOBJECT_TYPE(GFile)
SE_GOBJECT_TYPE(GFileMonitor)
SE_GLIB_TYPE(GMainLoop, g_main_loop)
SE_GLIB_TYPE(GAsyncQueue, g_async_queue)
SE_BEGIN_CXX