SyncSource: optional support for asynchronous insert/update/delete

The wrapper around the actual operation checks if the operation
returned an error or result code (traditional behavior). If not, it
expects a ContinueOperation instance, remembers it and calls it when
the same operation gets called again for the same item.

For add/insert, "same item" is detected based on the KeyH address,
which must not change. For delete, the item local ID is used.

Pre- and post-signals are called exactly once, before the first call
and after the last call of the item.

ContinueOperation is a simple boost::function pointer for now. The
Synthesis engine itself is not able to force completion of the
operation, it just polls. This can lead to many empty messages with
just an Alert inside, thus triggering the "endless loop" protection,
which aborts the sync.

We overcome this limitation in the SyncEvolution layer above the
Synthesis engine: first, we flush pending operations before starting
network IO. This is a good place to batch together all pending
operations. Second, to overcome the "endless loop" problem, we force
a waiting for completion if the last message already was empty. If
that happened, we are done with items and should start sending our
responses.

Binding a function which returns the traditional TSyError still works
because it gets copied transparently into the boost::variant that the
wrapper expects, so no other code in SyncSource or backends needs to
be adapted. Enabling the use of LOCERR_AGAIN in the utility classes
and backends will follow in the next patches.
This commit is contained in:
Patrick Ohly 2013-06-05 17:22:00 +02:00
parent b072af3dc1
commit 856295b1e1
3 changed files with 365 additions and 14 deletions

View File

@ -1061,6 +1061,7 @@ public:
using inherited::end;
using inherited::rbegin;
using inherited::rend;
using inherited::size;
/** transfers ownership (historic reasons for storing plain pointer...) */
void addSource(cxxptr<SyncSource> &source) { checkSource(source); push_back(source.release()); }
@ -1811,6 +1812,7 @@ void SyncContext::displaySourceProgress(sysync::TProgressEventEnum type,
SE_LOG_INFO(NULL, "%s: received %d",
source.getDisplayName().c_str(), extra1);
}
source.recordTotalNumItemsReceived(extra1);
break;
case sysync::PEV_ITEMSENT:
/* item sent, extra1=current item count,
@ -3704,6 +3706,7 @@ SyncMLStatus SyncContext::doSync()
Timespec sendStart, resendStart;
int requestNum = 0;
sysync::uInt16 previousStepCmd = stepCmd;
std::vector<int> numItemsReceived; // source->getTotalNumItemsReceived() for each source, see STEPCMD_SENDDATA
do {
try {
// check for suspend, if so, modify step command for next step
@ -3880,6 +3883,34 @@ SyncMLStatus SyncContext::doSync()
m_retries = 0;
break;
case sysync::STEPCMD_SENDDATA: {
// We'll be busy for a while with network IO, so give
// sources a chance to do some work in parallel.
if (m_sourceListPtr) {
bool needResults = true;
if (numItemsReceived.size() < m_sourceListPtr->size()) {
numItemsReceived.insert(numItemsReceived.end(),
m_sourceListPtr->size() - numItemsReceived.size(),
0);
}
for (size_t i = 0; i < numItemsReceived.size(); i++) {
SyncSource *source = (*m_sourceListPtr->getSourceSet())[i];
int received = source->getTotalNumItemsReceived();
SE_LOG_DEBUG(source->getDisplayName(), "total number of items received %d",
received);
if (numItemsReceived[i] != received) {
numItemsReceived[i] = received;
needResults = false;
}
}
BOOST_FOREACH (SyncSource *source, *m_sourceListPtr) {
source->flush();
if (needResults) {
source->finish();
}
}
}
// send data to remote
SharedKey sessionKey = m_engine.OpenSessionKey(session);

View File

@ -334,6 +334,7 @@ class SyncSourceReport {
public:
SyncSourceReport() {
memset(m_stat, 0, sizeof(m_stat));
m_received = 0;
m_first =
m_resume = false;
m_mode = SYNC_NONE;
@ -421,6 +422,10 @@ class SyncSourceReport {
void recordStatus(SyncMLStatus status ) { m_status = status; }
SyncMLStatus getStatus() const { return m_status; }
/** counts each received add/update/delete */
void recordTotalNumItemsReceived(int received) { m_received = received; }
int getTotalNumItemsReceived() const { return m_received; }
/**
* if not empty, then this was the virtual source which cause the
* current one to be included in the sync
@ -434,6 +439,7 @@ class SyncSourceReport {
private:
/** storage for getItemStat(): allow access with _MAX as index */
int m_stat[ITEM_LOCATION_MAX + 1][ITEM_STATE_MAX + 1][ITEM_RESULT_MAX + 1];
int m_received;
SyncMode m_mode;
int m_restarts;

View File

@ -31,6 +31,7 @@
#include <boost/function.hpp>
#include <boost/signals2.hpp>
#include <boost/variant.hpp>
#include <syncevo/declarations.h>
SE_BEGIN_CXX
@ -693,12 +694,55 @@ class OperationSlotInvoker {
};
/**
* helper class, needs to be specialized based on number of parameters
* Helper class for looking up a pending operation by a Synthesis parameter.
* KeyH (add, replace) and item ID (delete) are supported.
*/
template<class F, int arity> class OperationWrapperSwitch;
template<class A1> struct KeyConverter;
/** one parameter */
template<class F> class OperationWrapperSwitch<F, 0>
/**
* For KeyH we make the assumption that the key exists as long
* as the pending operation, and thus its address can be used as
* unique identifier for the operation.
*/
template<> struct KeyConverter<sysync::KeyH>
{
typedef void * key_type;
static key_type toKey(sysync::KeyH key) { return static_cast<void *>(key); }
};
/**
* For cItemID we just use the item ID as string.
*/
template<> struct KeyConverter<sysync::cItemID>
{
typedef std::string key_type;
static key_type toKey(sysync::cItemID id) { return id->item; }
};
/**
* To be returned by a function wrapped by OperationWrapper
* when the function is not done yet and wants to be called again
* for the same item.
*/
template <class F> class ContinueOperation : public boost::function<F>
{
public:
ContinueOperation()
{}
ContinueOperation(const boost::function<F> &callback) :
boost::function<F>(callback)
{}
};
/**
* Helper class, needs to be specialized based on number of parameters
* and return type.
*/
template<class F, int arity, class R> class OperationWrapperSwitch;
/** one parameter, sysync::TSyError type */
template<class F> class OperationWrapperSwitch<F, 0, sysync::TSyError>
{
public:
typedef sysync::TSyError result_type;
@ -781,7 +825,7 @@ template<class F> class OperationWrapperSwitch<F, 0>
PostSignal m_post;
};
template<class F> class OperationWrapperSwitch<F, 1>
template<class F> class OperationWrapperSwitch<F, 1, sysync::TSyError>
{
public:
typedef sysync::TSyError result_type;
@ -833,7 +877,83 @@ template<class F> class OperationWrapperSwitch<F, 1>
PostSignal m_post;
};
template<class F> class OperationWrapperSwitch<F, 2>
template<class F, class V> class OperationWrapperSwitch<F, 1, V>
{
public:
typedef sysync::TSyError result_type;
typedef boost::function<F> OperationType;
typedef typename boost::function<F>::arg1_type arg1_type;
typedef boost::signals2::signal<void (SyncSource &, arg1_type a1),
OperationSlotInvoker> PreSignal;
typedef boost::signals2::signal<void (SyncSource &, OperationExecution, sysync::TSyError,
arg1_type a1),
OperationSlotInvoker> PostSignal;
typedef KeyConverter<arg1_type> Converter;
typedef ContinueOperation<sysync::TSyError (arg1_type)> Continue;
typedef std::map<typename Converter::key_type, Continue> Pending;
sysync::TSyError operator () (SyncSource &source,
arg1_type a1) const throw ()
{
sysync::TSyError res;
OperationExecution exec;
// Marking m_pending "volatile" didn't work, find() not defined for that.
typename Pending::iterator it = const_cast<Pending &>(m_pending).find(Converter::toKey(a1));
bool continuing = it != m_pending.end();
res = continuing ? sysync::LOCERR_OK : m_pre(source, a1);
if (res != sysync::LOCERR_OK) {
exec = OPERATION_SKIPPED;
} else {
if (continuing) {
res = it->second(a1);
if (res != sysync::LOCERR_AGAIN) {
const_cast<Pending &>(m_pending).erase(it);
}
} else if (m_operation) {
try {
V newres = m_operation(a1);
sysync::TSyError *completed = boost::get<sysync::TSyError>(&newres);
if (completed) {
res = *completed;
} else {
res = sysync::LOCERR_AGAIN;
Continue cont = boost::get<Continue>(newres);
const_cast<Pending &>(m_pending).insert(std::make_pair(Converter::toKey(a1), cont));
}
exec = OPERATION_FINISHED;
} catch (...) {
res = Exception::handle(/* source */);
exec = OPERATION_EXCEPTION;
}
} else {
res = sysync::LOCERR_NOTIMP;
exec = OPERATION_EMPTY;
}
}
if (res != sysync::LOCERR_AGAIN) {
sysync::TSyError newres = m_post(source, exec, res, a1);
if (newres != sysync::LOCERR_OK) {
res = newres;
}
}
return res == STATUS_FATAL ? STATUS_DATASTORE_FAILURE : res;
}
PreSignal &getPreSignal() const { return const_cast<PreSignal &>(m_pre); }
PostSignal &getPostSignal() const { return const_cast<PostSignal &>(m_post); }
protected:
OperationType m_operation;
private:
PreSignal m_pre;
PostSignal m_post;
Pending m_pending;
};
template<class F> class OperationWrapperSwitch<F, 2, sysync::TSyError>
{
public:
typedef sysync::TSyError result_type;
@ -886,7 +1006,84 @@ template<class F> class OperationWrapperSwitch<F, 2>
PostSignal m_post;
};
template<class F> class OperationWrapperSwitch<F, 3>
template<class F, class V> class OperationWrapperSwitch<F, 2, V>
{
public:
typedef sysync::TSyError result_type;
typedef boost::function<F> OperationType;
typedef typename boost::function<F>::arg1_type arg1_type;
typedef typename boost::function<F>::arg2_type arg2_type;
typedef boost::signals2::signal<void (SyncSource &, arg1_type a1, arg2_type a2),
OperationSlotInvoker> PreSignal;
typedef boost::signals2::signal<void (SyncSource &, OperationExecution, sysync::TSyError,
arg1_type a1, arg2_type a2),
OperationSlotInvoker> PostSignal;
typedef KeyConverter<arg1_type> Converter;
typedef ContinueOperation<sysync::TSyError (arg1_type, arg2_type)> Continue;
typedef std::map<typename Converter::key_type, Continue> Pending;
sysync::TSyError operator () (SyncSource &source,
arg1_type a1, arg2_type a2) const throw ()
{
sysync::TSyError res;
OperationExecution exec;
// Marking m_pending "volatile" didn't work, find() not defined for that.
typename Pending::iterator it = const_cast<Pending &>(m_pending).find(Converter::toKey(a1));
bool continuing = it != m_pending.end();
res = continuing ? sysync::LOCERR_OK : m_pre(source, a1, a2);
if (res != sysync::LOCERR_OK) {
exec = OPERATION_SKIPPED;
} else {
if (continuing) {
res = it->second(a1, a2);
if (res != sysync::LOCERR_AGAIN) {
const_cast<Pending &>(m_pending).erase(it);
}
} else if (m_operation) {
try {
V newres = m_operation(a1, a2);
sysync::TSyError *completed = boost::get<sysync::TSyError>(&newres);
if (completed) {
res = *completed;
} else {
res = sysync::LOCERR_AGAIN;
Continue cont = boost::get<Continue>(newres);
const_cast<Pending &>(m_pending).insert(std::make_pair(Converter::toKey(a1), cont));
}
exec = OPERATION_FINISHED;
} catch (...) {
res = Exception::handle(/* source */);
exec = OPERATION_EXCEPTION;
}
} else {
res = sysync::LOCERR_NOTIMP;
exec = OPERATION_EMPTY;
}
}
if (res != sysync::LOCERR_AGAIN) {
sysync::TSyError newres = m_post(source, exec, res, a1, a2);
if (newres != sysync::LOCERR_OK) {
res = newres;
}
}
return res == STATUS_FATAL ? STATUS_DATASTORE_FAILURE : res;
}
PreSignal &getPreSignal() const { return const_cast<PreSignal &>(m_pre); }
PostSignal &getPostSignal() const { return const_cast<PostSignal &>(m_post); }
protected:
OperationType m_operation;
private:
PreSignal m_pre;
PostSignal m_post;
Pending m_pending;
};
template<class F> class OperationWrapperSwitch<F, 3, sysync::TSyError>
{
public:
typedef sysync::TSyError result_type;
@ -941,16 +1138,103 @@ template<class F> class OperationWrapperSwitch<F, 3>
};
template<class F, class V> class OperationWrapperSwitch<F, 3, V>
{
public:
typedef sysync::TSyError result_type;
typedef boost::function<F> OperationType;
typedef typename boost::function<F>::arg1_type arg1_type;
typedef typename boost::function<F>::arg2_type arg2_type;
typedef typename boost::function<F>::arg3_type arg3_type;
typedef boost::signals2::signal<void (SyncSource &, arg1_type a1, arg2_type a2, arg3_type a3),
OperationSlotInvoker> PreSignal;
typedef boost::signals2::signal<void (SyncSource &, OperationExecution, sysync::TSyError,
arg1_type a1, arg2_type a2, arg3_type a3),
OperationSlotInvoker> PostSignal;
typedef KeyConverter<arg1_type> Converter;
typedef ContinueOperation<sysync::TSyError (arg1_type, arg2_type, arg3_type)> Continue;
typedef std::map<typename Converter::key_type, Continue> Pending;
sysync::TSyError operator () (SyncSource &source,
arg1_type a1, arg2_type a2, arg3_type a3) const throw ()
{
sysync::TSyError res;
OperationExecution exec;
// Marking m_pending "volatile" didn't work, find() not defined for that.
typename Pending::iterator it = const_cast<Pending &>(m_pending).find(Converter::toKey(a1));
bool continuing = it != m_pending.end();
res = continuing ? sysync::LOCERR_OK : m_pre(source, a1, a2, a3);
if (res != sysync::LOCERR_OK) {
exec = OPERATION_SKIPPED;
} else {
if (continuing) {
res = it->second(a1, a2, a3);
if (res != sysync::LOCERR_AGAIN) {
const_cast<Pending &>(m_pending).erase(it);
}
} else if (m_operation) {
try {
V newres = m_operation(a1, a2, a3);
sysync::TSyError *completed = boost::get<sysync::TSyError>(&newres);
if (completed) {
res = *completed;
} else {
res = sysync::LOCERR_AGAIN;
Continue cont = boost::get<Continue>(newres);
const_cast<Pending &>(m_pending).insert(std::make_pair(Converter::toKey(a1), cont));
}
exec = OPERATION_FINISHED;
} catch (...) {
res = Exception::handle(/* source */);
exec = OPERATION_EXCEPTION;
}
} else {
res = sysync::LOCERR_NOTIMP;
exec = OPERATION_EMPTY;
}
}
if (res != sysync::LOCERR_AGAIN) {
sysync::TSyError newres = m_post(source, exec, res, a1, a2, a3);
if (newres != sysync::LOCERR_OK) {
res = newres;
}
}
return res == STATUS_FATAL ? STATUS_DATASTORE_FAILURE : res;
}
PreSignal &getPreSignal() const { return const_cast<PreSignal &>(m_pre); }
PostSignal &getPostSignal() const { return const_cast<PostSignal &>(m_post); }
protected:
OperationType m_operation;
private:
PreSignal m_pre;
PostSignal m_post;
Pending m_pending;
};
/**
* This mimics a boost::function with the same signature. The function
* signature F must have a sysync::TSyError error return code, as in most
* of the Synthesis DB API.
* of the Synthesis DB API, or a boost::variant of sysync::TSyError and
* ContinueOperation<F>.
*
* Specializations of this class for operations with different number
* of parameters provide a call operator which invokes a pre- and
* post-signal around the actual implementation. See
* OperationWrapperSwitch<F, 0> for details.
*
* If the function returns a variant with a ContinueOperation<F> inside,
* the OperationWrapper will store that callback for the current
* set of parameters (currently using only the first one as key),
* then when called again, skip the pre-signal and invoke the callback
* instead of the original operation. That callback may return LOCERR_AGAIN
* to request being called again the same way. The post-signal is
* called when the operation finally completes.
*
* Additional operations could be wrapped by providing more
* specializations (different return code, more parameters). The
* number or parameters in the operation cannot exceed six, because
@ -959,9 +1243,9 @@ template<class F> class OperationWrapperSwitch<F, 3>
* supported arguments in boost::signals2/boost::function.
*/
template<class F> class OperationWrapper :
public OperationWrapperSwitch<F, boost::function<F>::arity>
public OperationWrapperSwitch<F, boost::function<F>::arity, typename boost::function<F>::result_type>
{
typedef OperationWrapperSwitch<F, boost::function<F>::arity> inherited;
typedef OperationWrapperSwitch<F, boost::function<F>::arity, typename boost::function<F>::result_type> inherited;
public:
/** operation implemented? */
operator bool () const { return inherited::m_operation; }
@ -976,7 +1260,6 @@ public OperationWrapperSwitch<F, boost::function<F>::arity>
}
};
/**
* abstract base class for SyncSource with some common functionality
* and no data
@ -1247,6 +1530,9 @@ class SyncSourceBase {
typedef OperationWrapper<sysync::TSyError ()> StartDataWrite_t;
StartDataWrite_t m_startDataWrite;
typedef OperationWrapper<sysync::TSyError (sysync::cItemID aID, sysync::ItemID updID)> FinalizeLocalID_t;
FinalizeLocalID_t m_finalizeLocalID;
typedef OperationWrapper<sysync::TSyError (bool success, char **newToken)> EndDataWrite_t;
EndDataWrite_t m_endDataWrite;
@ -1259,13 +1545,19 @@ class SyncSourceBase {
typedef OperationWrapper<sysync::TSyError (sysync::cItemID aID, sysync::KeyH aItemKey)> ReadItemAsKey_t;
ReadItemAsKey_t m_readItemAsKey;
typedef OperationWrapper<sysync::TSyError (sysync::KeyH aItemKey, sysync::ItemID newID)> InsertItemAsKey_t;
typedef ContinueOperation<sysync::TSyError (sysync::KeyH aItemKey, sysync::ItemID newID)> InsertItemAsKeyContinue_t;
typedef boost::variant<sysync::TSyError, InsertItemAsKeyContinue_t> InsertItemAsKeyResult_t;
typedef OperationWrapper<InsertItemAsKeyResult_t (sysync::KeyH aItemKey, sysync::ItemID newID)> InsertItemAsKey_t;
InsertItemAsKey_t m_insertItemAsKey;
typedef OperationWrapper<sysync::TSyError (sysync::KeyH aItemKey, sysync::cItemID aID, sysync::ItemID updID)> UpdateItemAsKey_t;
typedef ContinueOperation<sysync::TSyError (sysync::KeyH aItemKey, sysync::cItemID aID, sysync::ItemID updID)> UpdateItemAsKeyContinue_t;
typedef boost::variant<sysync::TSyError, UpdateItemAsKeyContinue_t> UpdateItemAsKeyResult_t;
typedef OperationWrapper<UpdateItemAsKeyResult_t (sysync::KeyH aItemKey, sysync::cItemID aID, sysync::ItemID updID)> UpdateItemAsKey_t;
UpdateItemAsKey_t m_updateItemAsKey;
typedef OperationWrapper<sysync::TSyError (sysync::cItemID aID)> DeleteItem_t;
typedef ContinueOperation<sysync::TSyError (sysync::cItemID aID)> DeleteItemContinue_t;
typedef boost::variant<sysync::TSyError, DeleteItemContinue_t> DeleteItemResult_t;
typedef OperationWrapper<DeleteItemResult_t (sysync::cItemID aID)> DeleteItem_t;
DeleteItem_t m_deleteItem;
/**@}*/
@ -1541,6 +1833,28 @@ class SyncSource : virtual public SyncSourceBase, public SyncSourceConfig, publi
*/
virtual void open() = 0;
/**
* Start flushing item modifications which were not executed right
* away. Item modifications (add/update/delete) can be delayed by
* returning LOCERR_AGAIN or, when using for example
* SyncSourceSerialize aka TrackingSyncSource, by returning a "check"
* function instead of the final result.
*
* The sync engine calls this method after processing each incoming
* SyncML message.
*/
virtual void flush() {}
/**
* Called after flush() to ensure that all pending modifications
* have completed. Called when the engine needs the results.
*
* Called by the sync engine when the SyncML peer ran out of new
* item changes. At that time we would start sending back and forth
* empty messages, unless we can provide results.
*/
virtual void finish() {}
/**
* Returns the actual database that is in use. open() must
* have been called first.