EDS contacts: read-ahead cache

Performance is improved by requesting multiple contacts at once and
overlapping reading with processing. On a fast system (SSD, CPU fast
enough to not be the limiting factor), testpim.py's testSync takes 8
seconds for a "match" sync where 1000 contacts get loaded and compared
against the same set of contacts. Read-ahead with only 1 contact per
query speeds that up to 6.7s due to overlapping IO and
processing. Read-ahead with the default 50 contacts per query takes
5.5s. It does not get much faster with larger queries.

While returning items from one cache populated with a single
e_book_client_get_contacts() call, another query is started to overlap
processing and loading.

To achieve efficient read-ahead, the backend relies on the hint given
to it via setReadAheadOrder(). As soon as it detects that a contact is
not the next one according to that order, it switches back to reading
one contact at a time. This happens during the write phase of a sync
where the Synthesis engine needs to read, update, and write back
changes based on updates sent by the peer.

Cache statistics show that this works well for --print-items, --export
and slow syncs.

Writing into the database must invalidate the corresponding cached
contact. Otherwise the backup operation after a sync may end up
reading stale data.
This commit is contained in:
Patrick Ohly 2013-06-27 11:13:53 +02:00
parent 7d12c0a586
commit 50c06bbe61
2 changed files with 414 additions and 17 deletions

View file

@ -35,18 +35,18 @@ using namespace std;
#include <syncevo/Logging.h>
#ifdef USE_EDS_CLIENT
#include <boost/range/algorithm/find.hpp>
#endif
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/foreach.hpp>
#include <boost/lambda/lambda.hpp>
#include <syncevo/declarations.h>
void inline intrusive_ptr_add_ref(EBookQuery *ptr) { e_book_query_ref(ptr); }
void inline intrusive_ptr_release(EBookQuery *ptr) { e_book_query_unref(ptr); }
SE_BEGIN_CXX
typedef boost::intrusive_ptr<EBookQuery> EBookQueryCXX;
SE_END_CXX
SE_GLIB_TYPE(EBookQuery, e_book_query)
SE_BEGIN_CXX
@ -71,6 +71,14 @@ EvolutionContactSource::EvolutionContactSource(const SyncSourceParams &params,
EvolutionSyncSource(params),
m_vcardFormat(vcardFormat)
{
#ifdef USE_EDS_CLIENT
m_cacheMisses =
m_cacheStalls =
m_contactReads =
m_contactsFromDB =
m_contactQueries = 0;
m_readAheadOrder = READ_NONE;
#endif
SyncSourceLogging::init(InitList<std::string>("N_FIRST") + "N_MIDDLE" + "N_LAST",
" ",
m_operations);
@ -87,6 +95,10 @@ EvolutionContactSource::~EvolutionContactSource()
// TODO: cancel the operations().
finishItemChanges();
close();
#ifdef USE_EDS_CLIENT
// logCacheStats(Logger::DEBUG);
#endif
}
#ifdef USE_EDS_CLIENT
@ -366,7 +378,7 @@ void EvolutionContactSource::listAllItems(RevisionMap_t &revisions)
EBookQueryCXX allItemsQuery(e_book_query_any_field_contains(""), TRANSFER_REF);
PlainGStr sexp(e_book_query_to_string (allItemsQuery.get()));
if (!e_book_client_get_view_sync(m_addressbook, sexp, &view, NULL, gerror)) {
throwError( "getting the view" , gerror);
}
@ -461,17 +473,370 @@ string EvolutionContactSource::getRevision(const string &luid)
return rev;
}
#ifdef USE_EDS_CLIENT
class ContactCache : public std::map<std::string, EContactCXX>
{
public:
/** Asynchronous method call still pending. */
bool m_running;
/** The last luid requested in this query. Needed to start with the next contact after it. */
std::string m_lastLUID;
/** Result of batch read. Any error here means that the call failed completely. */
GErrorCXX m_gerror;
/** A debug logging name for this query. */
std::string m_name;
};
void EvolutionContactSource::setReadAheadOrder(ReadAheadOrder order,
const ReadAheadItems &luids)
{
SE_LOG_DEBUG(getDisplayName(), "reading: set order '%s', %ld luids",
order == READ_NONE ? "none" :
order == READ_ALL_ITEMS ? "all" :
order == READ_CHANGED_ITEMS ? "changed" :
order == READ_SELECTED_ITEMS ? "selected" :
"???",
(long)luids.size());
m_readAheadOrder = order;
m_nextLUIDs = luids;
// Be conservative and throw away all cached data. Not doing so
// can confuse our "cache miss" counting, for example when it uses
// a cache where some entries have been removed in
// invalidateCachedContact() and then mistakes the gaps for cache
// misses.
//
// Another reason is that we want to use fairly recent data (in
// case of concurrent changes in the DB, which currently is not
// detected by the cache).
m_contactCache.reset();
m_contactCacheNext.reset();
}
void EvolutionContactSource::getReadAheadOrder(ReadAheadOrder &order,
ReadAheadItems &luids)
{
order = m_readAheadOrder;
luids = m_nextLUIDs;
}
void EvolutionContactSource::checkCacheForError(boost::shared_ptr<ContactCache> &cache)
{
if (cache->m_gerror) {
GErrorCXX gerror;
std::swap(gerror, cache->m_gerror);
cache.reset();
throwError(StringPrintf("reading contacts %s", cache->m_name.c_str()), gerror);
}
}
void EvolutionContactSource::invalidateCachedContact(const std::string &luid)
{
invalidateCachedContact(m_contactCache, luid);
invalidateCachedContact(m_contactCacheNext, luid);
}
void EvolutionContactSource::invalidateCachedContact(boost::shared_ptr<ContactCache> &cache, const std::string &luid)
{
if (cache) {
ContactCache::iterator it = cache->find(luid);
if (it != cache->end()) {
SE_LOG_DEBUG(getDisplayName(), "reading: remove contact %s from cache because of remove or update", luid.c_str());
// If we happen to read that contact (unlikely), it'll be
// considered a cache miss. That's okay. Together with
// counting cache misses it'll help us avoid using
// read-ahead when the Synthesis engine is randomly
// accessing contacts.
cache->erase(it);
}
}
}
bool EvolutionContactSource::getContact(const string &luid, EContact **contact, GErrorCXX &gerror)
{
SE_LOG_DEBUG(getDisplayName(), "reading: getting contact %s", luid.c_str());
ReadAheadOrder order = m_readAheadOrder;
// Use switch and let compiler tell us when we don't cover a case.
switch (m_accessMode) {
case SYNCHRONOUS:
order = READ_NONE;
break;
case BATCHED:
case DEFAULT:
order = m_readAheadOrder;
break;
};
m_contactReads++;
if (order == READ_NONE) {
m_contactsFromDB++;
m_contactQueries++;
return e_book_client_get_contact_sync(m_addressbook,
luid.c_str(),
contact,
NULL,
gerror);
} else {
return getContactFromCache(luid, contact, gerror);
}
}
bool EvolutionContactSource::getContactFromCache(const string &luid, EContact **contact, GErrorCXX &gerror)
{
*contact = NULL;
// Use ContactCache.
if (m_contactCache) {
SE_LOG_DEBUG(getDisplayName(), "reading: active cache %s", m_contactCache->m_name.c_str());
// Ran into a problem?
checkCacheForError(m_contactCache);
// Does the cache cover our item?
ContactCache::const_iterator it = m_contactCache->find(luid);
if (it == m_contactCache->end()) {
if (m_contactCacheNext) {
SE_LOG_DEBUG(getDisplayName(), "reading: not in cache, try cache %s",
m_contactCacheNext->m_name.c_str());
// Throw away old cache, try with next one. This is not
// a cache miss (yet).
m_contactCache = m_contactCacheNext;
m_contactCacheNext.reset();
return getContactFromCache(luid, contact, gerror);
} else {
SE_LOG_DEBUG(getDisplayName(), "reading: not in cache, nothing pending -> start reading");
// Throw away cache, start new read below.
m_contactCache.reset();
}
} else {
SE_LOG_DEBUG(getDisplayName(), "reading: in %s cache", m_contactCache->m_running ? "running" : "loaded");
if (m_contactCache->m_running) {
m_cacheStalls++;
GRunWhile(boost::lambda::var(m_contactCache->m_running));
}
// Problem?
checkCacheForError(m_contactCache);
SE_LOG_DEBUG(getDisplayName(), "reading: in cache, %s", it->second ? "available" : "not found");
if (it->second) {
// Got it.
*contact = it->second.ref();
} else {
// Delay throwing error. We need to go through the read-ahead code below.
gerror.take(g_error_new(E_BOOK_CLIENT_ERROR, E_BOOK_CLIENT_ERROR_CONTACT_NOT_FOUND,
"uid %s not found in batch read", luid.c_str()));
}
}
}
// No current cache? In that case we must read and block.
if (!m_contactCache) {
m_contactCache = startReading(luid, START);
// Call code above recursively, which will block.
return getContactFromCache(luid, contact, gerror);
}
// Can we read ahead?
if (!m_contactCacheNext && !m_contactCache->m_running) {
m_contactCacheNext = startReading(m_contactCache->m_lastLUID, CONTINUE);
}
// Everything is okay when we get here. Either we have the contact or
// it wasn't in the database.
SE_LOG_DEBUG(getDisplayName(), "reading: read %s: %s", luid.c_str(), gerror ? gerror->message : "<<okay>>");
logCacheStats(Logger::DEBUG);
return !gerror;
}
static int MaxBatchSize()
{
int maxBatchSize = atoi(getEnv("SYNCEVOLUTION_EDS_BATCH_SIZE", "50"));
if (maxBatchSize < 1) {
maxBatchSize = 1;
}
return maxBatchSize;
}
boost::shared_ptr<ContactCache> EvolutionContactSource::startReading(const std::string &luid, ReadingMode mode)
{
SE_LOG_DEBUG(getDisplayName(), "reading: %s contact %s",
mode == START ? "must read" :
mode == CONTINUE ? "continue after" :
"???",
luid.c_str());
static int maxBatchSize = MaxBatchSize();
std::vector<EBookQueryCXX> uidQueries;
uidQueries.resize(maxBatchSize);
std::vector<const std::string *> uids;
uids.resize(maxBatchSize);
int size = 0;
bool found = false;
switch (m_readAheadOrder) {
case READ_ALL_ITEMS:
case READ_CHANGED_ITEMS: {
const Items_t &items = getAllItems();
const Items_t &newItems = getNewItems();
const Items_t &updatedItems = getUpdatedItems();
Items_t::const_iterator it = items.find(luid);
// Always read the requested item, even if not found in item list?
if (mode == START) {
uids[0] = &luid;
size++;
}
// luid is dealt with, either way.
if (it != items.end()) {
// Check that it is a valid candidate for caching, else
// we have a cache miss prediction.
if (m_readAheadOrder == READ_ALL_ITEMS ||
newItems.find(luid) != newItems.end() ||
updatedItems.find(luid) != updatedItems.end()) {
found = true;
}
++it;
}
while (size < maxBatchSize &&
it != items.end()) {
const std::string &luid = *it;
if (m_readAheadOrder == READ_ALL_ITEMS ||
newItems.find(luid) != newItems.end() ||
updatedItems.find(luid) != updatedItems.end()) {
uids[size] = &luid;
++size;
}
++it;
}
break;
}
case READ_SELECTED_ITEMS: {
ReadAheadItems::const_iterator it = boost::find(std::make_pair(m_nextLUIDs.begin(), m_nextLUIDs.end()), luid);
// Always read the requested item, even if not found in item list?
if (mode == START) {
uids[0] = &luid;
size++;
}
// luid is dealt with, either way.
if (it != m_nextLUIDs.end()) {
found = true;
++it;
}
while (size < maxBatchSize &&
it != m_nextLUIDs.end()) {
uids[size] = &*it;
++size;
++it;
}
break;
}
case READ_NONE:
// May be reached when read-ahead was turned off while
// preparing for it.
if (mode == START) {
uids[0] = &luid;
size++;
}
break;
}
if (m_readAheadOrder != READ_NONE &&
mode == START &&
!found) {
// The requested contact was not on our list. Consider this
// a cache miss (or rather, cache prediction failure) and turn
// of the read-ahead.
m_cacheMisses++;
SE_LOG_DEBUG(getDisplayName(), "reading: disable read-ahead due to cache miss");
m_readAheadOrder = READ_NONE;
}
boost::shared_ptr<ContactCache> cache;
if (size) {
// Prepare parameter for EDS C call. Ownership of query instances is in uidQueries array.
boost::scoped_array<EBookQuery *> queries(new EBookQuery *[size]);
for (int i = 0; i < size; i++) {
// This shouldn't compile because we don't specify how ownership is handled.
// The reset() method always bumps the ref count, which is not what we want here!
// uidQueries[i].reset(e_book_query_field_test(E_CONTACT_UID, E_BOOK_QUERY_IS, it->c_str()));
//
// Take over ownership.
uidQueries[i] = EBookQueryCXX::steal(e_book_query_field_test(E_CONTACT_UID, E_BOOK_QUERY_IS, uids[i]->c_str()));
queries[i] = uidQueries[i].get();
}
EBookQueryCXX query(e_book_query_or(size, queries.get(), false), TRANSFER_REF);
PlainGStr sexp(e_book_query_to_string(query.get()));
cache.reset(new ContactCache);
cache->m_running = true;
cache->m_name = StringPrintf("%s-%s (%d)", uids[0]->c_str(), uids[size - 1]->c_str(), size);
cache->m_lastLUID = *uids[size - 1];
BOOST_FOREACH (const std::string *uid, std::make_pair(uids.begin(), uids.begin() + size)) {
(*cache)[*uid] = EContactCXX();
}
m_contactsFromDB += size;
m_contactQueries++;
SYNCEVO_GLIB_CALL_ASYNC(e_book_client_get_contacts,
boost::bind(&EvolutionContactSource::completedRead,
this,
boost::weak_ptr<ContactCache>(cache),
_1, _2, _3),
m_addressbook, sexp, NULL);
SE_LOG_DEBUG(getDisplayName(), "reading: started contact read %s", cache->m_name.c_str());
}
return cache;
}
typedef GListCXX< EContact, GSList, GObjectDestructor<EContact> > ContactListCXX;
void EvolutionContactSource::completedRead(const boost::weak_ptr<ContactCache> &cachePtr, gboolean success, GSList *contactsPtr, const GError *gerror) throw()
{
try {
ContactListCXX contacts(contactsPtr); // transfers ownership
boost::shared_ptr<ContactCache> cache = cachePtr.lock();
if (!cache) {
SE_LOG_DEBUG(getDisplayName(), "reading: contact read finished, results no longer needed: %s", gerror ? gerror->message : "<<successful>>");
return;
}
SE_LOG_DEBUG(getDisplayName(), "reading: contact read %s finished: %s",
cache->m_name.c_str(),
gerror ? gerror->message : "<<successful>>");
if (success) {
BOOST_FOREACH (EContact *contact, contacts) {
const char *uid = (const char *)e_contact_get_const(contact, E_CONTACT_UID);
SE_LOG_DEBUG(getDisplayName(), "reading: contact read %s got %s", cache->m_name.c_str(), uid);
(*cache)[uid] = EContactCXX(contact, ADD_REF);
}
} else {
cache->m_gerror = gerror;
}
cache->m_running = false;
} catch (...) {
Exception::handle(HANDLE_EXCEPTION_FATAL);
}
}
void EvolutionContactSource::logCacheStats(Logger::Level level)
{
SE_LOG(getDisplayName(), level,
"requested %d, retrieved %d from DB in %d queries, misses %d/%d (%d%%), stalls %d",
m_contactReads,
m_contactsFromDB,
m_contactQueries,
m_cacheMisses, m_contactReads, m_contactReads ? m_cacheMisses * 100 / m_contactReads : 0,
m_cacheStalls);
}
#endif
void EvolutionContactSource::readItem(const string &luid, std::string &item, bool raw)
{
EContact *contact;
GErrorCXX gerror;
if (
#ifdef USE_EDS_CLIENT
!e_book_client_get_contact_sync(m_addressbook,
luid.c_str(),
&contact,
NULL,
gerror)
!getContact(luid, &contact, gerror)
#else
!e_book_get_contact(m_addressbook,
luid.c_str(),
@ -655,6 +1020,7 @@ EvolutionContactSource::insertItem(const string &uid, const std::string &item, b
const_cast<char *>(uid.c_str()));
GErrorCXX gerror;
#ifdef USE_EDS_CLIENT
invalidateCachedContact(uid);
switch (m_accessMode) {
case SYNCHRONOUS:
if (uid.empty()) {
@ -723,7 +1089,8 @@ void EvolutionContactSource::removeItem(const string &uid)
GErrorCXX gerror;
if (
#ifdef USE_EDS_CLIENT
!e_book_client_remove_contact_by_uid_sync(m_addressbook, uid.c_str(), NULL, gerror)
(invalidateCachedContact(uid),
!e_book_client_remove_contact_by_uid_sync(m_addressbook, uid.c_str(), NULL, gerror))
#else
!e_book_remove_contact(m_addressbook, uid.c_str(), gerror)
#endif
@ -744,11 +1111,9 @@ std::string EvolutionContactSource::getDescription(const string &luid)
GErrorCXX gerror;
if (
#ifdef USE_EDS_CLIENT
!e_book_client_get_contact_sync(m_addressbook,
luid.c_str(),
&contact,
NULL,
gerror)
!getContact(luid,
&contact,
gerror)
#else
!e_book_get_contact(m_addressbook,
luid.c_str(),

View file

@ -42,6 +42,8 @@ SE_GOBJECT_TYPE(EContact)
SE_BEGIN_CXX
class ContactCache;
/**
* Implements access to Evolution address books.
*/
@ -152,6 +154,36 @@ class EvolutionContactSource : public EvolutionSyncSource,
virtual void flushItemChanges();
virtual void finishItemChanges();
// Read-ahead of item data.
boost::shared_ptr<ContactCache> m_contactCache, m_contactCacheNext;
int m_cacheMisses, m_cacheStalls;
int m_contactReads; /**< number of readItemAsKey() calls */
int m_contactsFromDB; /**< number of contacts requested from DB (including ones not found) */
int m_contactQueries; /**< total number of e_book_client_get_contacts() calls */
ReadAheadOrder m_readAheadOrder;
ReadAheadItems m_nextLUIDs;
void checkCacheForError(boost::shared_ptr<ContactCache> &cache);
void invalidateCachedContact(const std::string &luid);
void invalidateCachedContact(boost::shared_ptr<ContactCache> &cache, const std::string &luid);
bool getContact(const string &luid, EContact **contact, GErrorCXX &gerror);
bool getContactFromCache(const string &luid, EContact **contact, GErrorCXX &gerror);
enum ReadingMode
{
START, /**< luid is needed, must be read */
CONTINUE /**< luid is from old request, find next ones */
};
boost::shared_ptr<ContactCache> startReading(const std::string &luid, ReadingMode mode);
void completedRead(const boost::weak_ptr<ContactCache> &cachePtr, gboolean success, GSList *contactsPtr, const GError *gerror) throw();
void logCacheStats(Logger::Level level);
// Use the information provided to us to implement read-ahead efficiently.
virtual void setReadAheadOrder(ReadAheadOrder order,
const ReadAheadItems &luids);
virtual void getReadAheadOrder(ReadAheadOrder &order,
ReadAheadItems &luids);
#else
eptr<EBook, GObject> m_addressbook;
#endif