Transport: bug#3427, transport will retry send after long waiting and will stop retring finally if all fails.

After a http request is sent out, while the client is waiting for response,
the server found client is unreacheable (client side network down temporarily or
intermidate router down), so the server stops sending and closed the tcp
conenction, causing a half closed connection at client side.

To fix this, client side will timeout (60 seconds) during wating and will
resend previous request to retry. After 3 failed retries, it will finally
give up leading to an abort. The timeout and retry count can be customized
in server config file.
This commit is contained in:
Chen Congwu 2009-07-22 16:44:06 +08:00
parent c0996c491d
commit ec9941ece5
11 changed files with 278 additions and 10 deletions

View file

@ -31,6 +31,7 @@ CurlTransportAgent::CurlTransportAgent() :
m_easyHandle(easyInit()),
m_slist(NULL),
m_status(INACTIVE),
m_cb(NULL),
m_reply(NULL),
m_replyLen(0),
m_replySize(0)
@ -139,6 +140,13 @@ void CurlTransportAgent::setSSL(const std::string &cacerts,
checkCurl(code);
}
void CurlTransportAgent::setCallback (TransportCallback cb, void *udata, int interval)
{
m_cb = cb;
m_cbData = udata;
m_cbInterval = interval;
}
void CurlTransportAgent::send(const char *data, size_t len)
{
CURLcode code;
@ -164,13 +172,21 @@ void CurlTransportAgent::send(const char *data, size_t len)
m_slist = curl_slist_append(m_slist, contentHeader.c_str());
m_status = ACTIVE;
if ((code = curl_easy_setopt(m_easyHandle, CURLOPT_HTTPHEADER, m_slist)) ||
if(m_cb){
m_elapsed = 0;
}
m_aborting = false;
if ((code = curl_easy_setopt(m_easyHandle, CURLOPT_PROGRESSDATA, static_cast<void *> (this)))||
(code = curl_easy_setopt(m_easyHandle, CURLOPT_HTTPHEADER, m_slist)) ||
(code = curl_easy_setopt(m_easyHandle, CURLOPT_POSTFIELDSIZE, len)) ||
(code = curl_easy_perform(m_easyHandle))) {
((code = curl_easy_perform(m_easyHandle)) && ((code != CURLE_ABORTED_BY_CALLBACK)||m_aborting))
){
m_status = CANCELED;
checkCurl(code);
}
m_status = GOT_REPLY;
if(code != CURLE_ABORTED_BY_CALLBACK) {
m_status = GOT_REPLY;
}
}
void CurlTransportAgent::cancel()
@ -247,12 +263,32 @@ void CurlTransportAgent::checkCurl(CURLcode code)
}
}
int CurlTransportAgent::progressCallback(void*, double, double, double, double)
int CurlTransportAgent::progressCallback(void* transport, double, double, double, double)
{
CurlTransportAgent *agent = static_cast<CurlTransportAgent *> (transport);
SuspendFlags& s_flags = EvolutionSyncClient::getSuspendFlags();
//abort transfer
if (s_flags.state == SuspendFlags::CLIENT_ABORT)
if (s_flags.state == SuspendFlags::CLIENT_ABORT){
agent->setAborting (true);
return -1;
}
return agent->processCallback();
}
int CurlTransportAgent::processCallback()
{
if (m_cb){
if (++m_elapsed > m_cbInterval){
m_elapsed = 0;
bool cont = m_cb (m_cbData);
if (cont) {
m_status = TIME_OUT;
}else {
m_aborting = true;
}
return -1;
}
}
return 0;
}

View file

@ -53,12 +53,21 @@ class CurlTransportAgent : public TransportAgent
virtual void cancel();
virtual Status wait();
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
virtual void setCallback (TransportCallback cb, void * udata, int interval);
int processCallback();
void setAborting(bool aborting) {m_aborting = aborting;}
private:
CURL *m_easyHandle;
curl_slist *m_slist;
std::string m_contentType;
Status m_status;
bool m_aborting;
TransportCallback m_cb;
void *m_cbData;
int m_elapsed;
int m_cbInterval;
/**
* libcurl < 7.17.0 does not copy strings passed into curl_easy_setopt().

View file

@ -1158,6 +1158,23 @@ void EvolutionSyncClient::initSources(SourceList &sourceList)
}
}
bool EvolutionSyncClient::transport_cb (void *udata)
{
return static_cast <EvolutionSyncClient *> (udata) -> processTransportCb();
}
bool EvolutionSyncClient::processTransportCb()
{
if(++m_retries > m_retryCount)
{
SE_LOG_INFO(NULL, NULL, "Transport: give up after %d tries", m_retries-1);
return false;
}
SE_LOG_INFO(NULL, NULL, "Transport: retry send #%d after %d seconds timeout", m_retries, m_timeout);
return true;
}
// XML configuration converted to C string constant
extern "C" {
extern const char *SyncEvolutionXML;
@ -1613,6 +1630,8 @@ SyncMLStatus EvolutionSyncClient::doSync()
} catch (NoSuchKey error) {
}
m_timeout = getReqTimeout();
m_retryCount = getReqRetries();
// run an HTTP client sync session
boost::shared_ptr<TransportAgent> agent(createTransportAgent());
if (getUseProxy()) {
@ -1650,6 +1669,8 @@ SyncMLStatus EvolutionSyncClient::doSync()
// parameter STEPCMD_ABORT -> abort session as soon as possible.
bool aborting = false;
int suspending = 0;
m_retries = 0;
bool resend = false;
sysync::uInt16 previousStepCmd = stepCmd;
do {
try {
@ -1691,6 +1712,7 @@ SyncMLStatus EvolutionSyncClient::doSync()
suspending++;
}
}
m_engine.SessionStep(session, stepCmd, &progressInfo);
//During suspention we actually insert a STEPCMD_SUSPEND cmd
//Should restore to the original step here
@ -1754,6 +1776,7 @@ SyncMLStatus EvolutionSyncClient::doSync()
// tbd: close communication channel if still open to make sure it is
// re-opened for the next request
stepCmd = sysync::STEPCMD_STEP;
m_retries = 0;
break;
case sysync::STEPCMD_SENDDATA: {
// send data to remote
@ -1768,13 +1791,20 @@ SyncMLStatus EvolutionSyncClient::doSync()
"contenttype");
agent->setContentType(s);
sessionKey.reset();
//register transport callback
agent->setCallback (transport_cb, this, m_timeout);
// use GetSyncMLBuffer()/RetSyncMLBuffer() to access the data to be
// sent or have it copied into caller's buffer using
// ReadSyncMLBuffer(), then send it to the server
sendBuffer = m_engine.GetSyncMLBuffer(session, true);
if(!resend) {
sendBuffer = m_engine.GetSyncMLBuffer(session, true);
}else {
SE_LOG_INFO (NULL, NULL, "EvolutionSyncClient: resend previous request");
}
agent->send(sendBuffer.get(), sendBuffer.size());
stepCmd = sysync::STEPCMD_SENTDATA; // we have sent the data
resend = false;
break;
}
case sysync::STEPCMD_NEEDDATA:
@ -1782,7 +1812,12 @@ SyncMLStatus EvolutionSyncClient::doSync()
case TransportAgent::ACTIVE:
stepCmd = sysync::STEPCMD_SENTDATA; // still sending the data?!
break;
case TransportAgent::TIME_OUT:
stepCmd = sysync::STEPCMD_RESENDDATA;
resend = true;
break;
case TransportAgent::GOT_REPLY: {
m_retries = 0;
sendBuffer.reset();
const char *reply;
size_t replylen;

View file

@ -498,6 +498,13 @@ class EvolutionSyncClient : public EvolutionSyncConfig, public ConfigUserInterfa
* override sync mode of all active sync sources if set
*/
string m_overrideMode;
};
int m_retries;
int m_timeout;
int m_retryCount;
public:
static bool transport_cb (void *data);
bool processTransportCb();
};
#endif // INCL_EVOLUTIONSYNCCLIENT

View file

@ -40,6 +40,8 @@ SoupTransportAgent::SoupTransportAgent(GMainLoop *loop) :
"Soup main loop"),
m_status(INACTIVE),
m_abortEventSource(-1),
m_cbEventSource(-1),
m_cb(NULL),
m_response(NULL)
{
#ifdef HAVE_LIBSOUP_SOUP_GNOME_FEATURES_H
@ -101,6 +103,13 @@ void SoupTransportAgent::setUserAgent(const std::string &agent)
NULL);
}
void SoupTransportAgent::setCallback (TransportCallback cb, void *data, int interval)
{
m_cb = cb;
m_cbData = data;
m_cbInterval = interval;
}
void SoupTransportAgent::send(const char *data, size_t len)
{
// ownership is transferred to libsoup in soup_session_queue_message()
@ -126,6 +135,10 @@ void SoupTransportAgent::send(const char *data, size_t len)
SOUP_MEMORY_TEMPORARY, data, len);
m_status = ACTIVE;
m_abortEventSource = g_timeout_add_seconds(ABORT_CHECK_INTERVAL, (GSourceFunc) AbortCallback, static_cast<gpointer> (this));
if(m_cb){
m_message = message.get();
m_cbEventSource = g_timeout_add_seconds(m_cbInterval, (GSourceFunc) TimeoutCallback, static_cast<gpointer> (this));
}
soup_session_queue_message(m_session.get(), message.release(),
SessionCallback, static_cast<gpointer>(this));
}
@ -155,6 +168,10 @@ TransportAgent::Status SoupTransportAgent::wait()
break;
}
/** For a canceled message, does not check the return status */
if(m_status == TIME_OUT){
m_failure.clear();
}
if (!m_failure.empty()) {
std::string failure;
std::swap(failure, m_failure);
@ -162,6 +179,7 @@ TransportAgent::Status SoupTransportAgent::wait()
}
g_source_remove(m_abortEventSource);
g_source_remove(m_cbEventSource);
return m_status;
}
@ -231,6 +249,25 @@ gboolean SoupTransportAgent::AbortCallback(gpointer transport)
return TRUE;
}
gboolean SoupTransportAgent::processCallback()
{
bool cont = m_cb(m_cbData);
if(cont){
//stop the message processing and mark status as timeout
guint message_status;
soup_session_cancel_message(m_session.get(), m_message, message_status);
m_status = TIME_OUT;
}else {
cancel();
}
return FALSE;
}
gboolean SoupTransportAgent::TimeoutCallback(gpointer transport)
{
SoupTransportAgent * sTransport = static_cast<SoupTransportAgent *>(transport);
return sTransport->processCallback();
}
} // namespace SyncEvolution
#endif // ENABLE_LIBSOUP

View file

@ -68,7 +68,8 @@ class SoupTransportAgent : public TransportAgent
virtual void cancel();
virtual Status wait();
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
virtual void setCallback (TransportCallback cb, void *udata, int interval);
gboolean processCallback();
private:
std::string m_proxyUser;
std::string m_proxyPassword;
@ -82,10 +83,18 @@ class SoupTransportAgent : public TransportAgent
std::string m_failure;
guint m_abortEventSource;
SoupMessage *m_message;
guint m_cbEventSource;
TransportCallback m_cb;
int m_cbInterval;
void *m_cbData;
/** User Abort check interval */
static const gint ABORT_CHECK_INTERVAL = 1;
/** This function is called regularly to check user abort event */
static gboolean AbortCallback (gpointer data);
/** This function is called regularly to detect timeout */
static gboolean TimeoutCallback (gpointer data);
/** response, copied from SoupMessage */
eptr<SoupBuffer, SoupBuffer, GLibUnref> m_response;

View file

@ -497,6 +497,11 @@ static BoolConfigProperty syncPropPrintChanges("printChanges",
"enables or disables the detailed (and sometimes slow) comparison\n"
"of database content before and after a sync session",
"1");
static IntConfigProperty syncPropReqTimeout("RequestTimeout",
"The time client waiting for a SyncML response after a sucessful request",
"60");
static IntConfigProperty syncPropReqRetries("RequestRetries",
"How many times the client will retry for a timeout, sync will abort after all failed retries", "3");
static ConfigProperty syncPropSSLServerCertificates("SSLServerCertificates",
"A string specifying the location of the certificates\n"
"used to authenticate the server. When empty, the\n"
@ -559,6 +564,8 @@ ConfigPropertyRegistry &EvolutionSyncConfig::getRegistry()
registry.push_back(&syncPropProxyUsername);
registry.push_back(&syncPropProxyPassword);
registry.push_back(&syncPropClientAuthType);
registry.push_back(&syncPropReqTimeout);
registry.push_back(&syncPropReqRetries);
registry.push_back(&syncPropDevID);
syncPropDevID.setObligatory(true);
registry.push_back(&syncPropWBXML);
@ -665,6 +672,10 @@ int EvolutionSyncConfig::getMaxLogDirs() const { return syncPropMaxLogDirs.getPr
void EvolutionSyncConfig::setMaxLogDirs(int value, bool temporarily) { syncPropMaxLogDirs.setProperty(*m_configNode, value, temporarily); }
int EvolutionSyncConfig::getLogLevel() const { return syncPropLogLevel.getProperty(*m_configNode); }
void EvolutionSyncConfig::setLogLevel(int value, bool temporarily) { syncPropLogLevel.setProperty(*m_configNode, value, temporarily); }
int EvolutionSyncConfig::getReqTimeout() const {return syncPropReqTimeout.getProperty(*m_configNode);}
void EvolutionSyncConfig::setReqTimeout(int value, bool temporarily) {syncPropReqTimeout.setProperty(*m_configNode, value, temporarily);}
int EvolutionSyncConfig::getReqRetries() const {return syncPropReqRetries.getProperty(*m_configNode);}
void EvolutionSyncConfig::setReqRetries(int value, bool temporarily) {return syncPropReqRetries.setProperty(*m_configNode,value,temporarily);}
bool EvolutionSyncConfig::getPrintChanges() const { return syncPropPrintChanges.getProperty(*m_configNode); }
void EvolutionSyncConfig::setPrintChanges(bool value, bool temporarily) { syncPropPrintChanges.setProperty(*m_configNode, value, temporarily); }
std::string EvolutionSyncConfig::getWebURL() const { return syncPropWebURL.getProperty(*m_configNode); }

View file

@ -721,6 +721,10 @@ class EvolutionSyncConfig {
virtual void setSSLVerifyServer(bool value, bool temporarily = false);
virtual bool getSSLVerifyHost() const;
virtual void setSSLVerifyHost(bool value, bool temporarily = false);
virtual int getReqTimeout() const;
virtual void setReqTimeout(int value, bool temporarily = false);
virtual int getReqRetries() const;
virtual void setReqRetries(int value, bool temporarily = false);
virtual bool getCompression() const;
virtual void setCompression(bool value, bool temporarily = false);
virtual unsigned int getResponseTimeout() const { return 0; }

View file

@ -41,6 +41,7 @@ namespace SyncEvolution {
class TransportAgent
{
public:
typedef bool (*TransportCallback) (void *udata);
/**
* set transport specific URL of next message
*/
@ -117,6 +118,10 @@ class TransportAgent
* sending message has failed
*/
FAILED,
/**
* transport timeout
*/
TIME_OUT,
/**
* unused transport, configure and use send()
*/
@ -130,6 +135,13 @@ class TransportAgent
*/
virtual Status wait() = 0;
/**
* The callback is called every interval seconds, with udata as the last
* parameter. The callback will return true to indicate retry and false
* to indicate abort.
*/
virtual void setCallback (TransportCallback cb, void * udata, int interval) = 0;
/**
* provides access to reply data
*
@ -147,6 +159,7 @@ class TransportAgent
/** normal HTTP URL encoded */
static const char * const m_contentTypeURLEncoded;
};
class TransportException : public SyncEvolutionException

View file

@ -1713,6 +1713,23 @@ void SyncTests::addTests() {
addTest(FilterTest(suspendTests));
}
if (config.retrySync &&
config.insertItem &&
config.updateItem &&
accessClientB &&
config.dump &&
config.compare) {
CppUnit::TestSuite *resendTests = new CppUnit::TestSuite(getName() + "::Resend");
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendClientAdd);
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendClientRemove);
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendClientUpdate);
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendServerAdd);
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendServerRemove);
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendServerUpdate);
ADD_TEST_TO_SUITE(resendTests, SyncTests, testResendFull);
addTest(FilterTest(resendTests));
}
}
}
@ -2752,6 +2769,47 @@ void SyncTests::doVarSizes(bool withMaxMsgSize,
compareDatabases();
}
class TransportResendInjector : public TransportWrapper{
private:
int timeout;
public:
TransportResendInjector()
:TransportWrapper() {
const char *s = getenv("CLIENT_TEST_RESEND_TIMEOUT");
timeout = s ? atoi(s) : 0;
}
~TransportResendInjector() {
}
virtual void send(const char *data, size_t len)
{
m_messageCount++;
if (m_interruptAtMessage >= 0 &&
m_messageCount == m_interruptAtMessage+1) {
m_wrappedAgent->send(data, len);
m_status = m_wrappedAgent->wait();
//trigger client side resend
sleep (timeout);
m_status = TIME_OUT;
}
else
{
m_wrappedAgent->send(data, len);
m_status = m_wrappedAgent->wait();
}
}
virtual void getReply(const char *&data, size_t &len, std::string &contentType) {
if (m_status == FAILED) {
data = "";
len = 0;
} else {
m_wrappedAgent->getReply(data, len, contentType);
}
}
};
class TransportFaultInjector : public TransportWrapper{
public:
TransportFaultInjector()
@ -2992,7 +3050,11 @@ void SyncTests::doInterruptResume(int changes,
// continue, wait until server timeout
if(sleep_t)
sleep (sleep_t);
accessClientB->doSync("retryB", SyncOptions(SYNC_TWO_WAY));
// no need for resend tests
if (!dynamic_cast <TransportResendInjector *> (wrapper.get())) {
accessClientB->doSync("retryB", SyncOptions(SYNC_TWO_WAY));
}
}
// copy changes to client A
@ -3120,6 +3182,42 @@ void SyncTests::testUserSuspendFull()
SERVER_ADD|SERVER_REMOVE|SERVER_UPDATE, boost::shared_ptr<TransportWrapper> (new UserSuspendInjector()));
}
void SyncTests::testResendClientAdd()
{
doInterruptResume(CLIENT_ADD, boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::testResendClientRemove()
{
doInterruptResume(CLIENT_REMOVE, boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::testResendClientUpdate()
{
doInterruptResume(CLIENT_UPDATE, boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::testResendServerAdd()
{
doInterruptResume(SERVER_ADD, boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::testResendServerRemove()
{
doInterruptResume(SERVER_REMOVE, boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::testResendServerUpdate()
{
doInterruptResume(SERVER_UPDATE, boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::testResendFull()
{
doInterruptResume(CLIENT_ADD|CLIENT_REMOVE|CLIENT_UPDATE|
SERVER_ADD|SERVER_REMOVE|SERVER_UPDATE,
boost::shared_ptr<TransportWrapper> (new TransportResendInjector()));
}
void SyncTests::doSync(const SyncOptions &options)
{

View file

@ -943,6 +943,13 @@ protected:
virtual void testUserSuspendServerUpdate();
virtual void testUserSuspendFull();
virtual void testResendClientAdd();
virtual void testResendClientRemove();
virtual void testResendClientUpdate();
virtual void testResendServerAdd();
virtual void testResendServerRemove();
virtual void testResendServerUpdate();
virtual void testResendFull();
/**
* implements testMaxMsg(), testLargeObject(), testLargeObjectEncoded()
@ -1009,6 +1016,8 @@ public:
m_options = NULL;
}
virtual Status wait() { return m_status; }
virtual void setCallback (TransportCallback cb, void *udata, int interval)
{ return m_wrappedAgent->setCallback(cb, udata, interval);}
};
/** assert equality, include string in message if unequal */