TransportAgent: simplified timeout API

Instead of allowing users of the API to register a callback
which can choose between aborting and time out, only accept
the timeout duration and always treat that as a timeout.

The advanced functionality wasn't used and the simplification
makes implementing the API easier.
This commit is contained in:
Patrick Ohly 2011-02-09 15:02:13 +01:00
parent 75c75dcf97
commit 40f84d7fc1
13 changed files with 60 additions and 130 deletions

View File

@ -2416,16 +2416,14 @@ class DBusTransportAgent : public TransportAgent
std::string m_type;
/*
* When the callback is invoked, we always abort the current
* When the timeout occurs, we always abort the current
* transmission. If it is invoked while we are not in the wait()
* of this transport, then we remember that in m_eventTriggered
* and return from wait() right away. The main loop is only
* quit when the transport is waiting in it. This is a precaution
* to not interfere with other parts of the code.
*/
TransportCallback m_callback;
void *m_callbackData;
int m_callbackInterval;
int m_timeoutSeconds;
GLibEvent m_eventSource;
bool m_eventTriggered;
bool m_waiting;
@ -2448,11 +2446,9 @@ class DBusTransportAgent : public TransportAgent
virtual void cancel() {}
virtual void shutdown();
virtual Status wait(bool noReply = false);
virtual void setCallback (TransportCallback cb, void * udata, int interval)
virtual void setTimeout(int seconds)
{
m_callback = cb;
m_callbackData = udata;
m_callbackInterval = interval;
m_timeoutSeconds = seconds;
m_eventSource = 0;
}
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
@ -3049,11 +3045,7 @@ boost::shared_ptr<TransportAgent> DBusSync::createTransportAgent()
// client (API not designed for it), let's use the hard timeout
// from RetryDuration here.
int timeout = getRetryDuration();
if (timeout) {
agent->setCallback(transport_cb,
reinterpret_cast<void *>(static_cast<uintptr_t>(timeout)),
timeout);
}
agent->setTimeout(timeout);
return agent;
} else {
// no connection, use HTTP via libsoup/GMainLoop
@ -4627,7 +4619,7 @@ DBusTransportAgent::DBusTransportAgent(GMainLoop *loop,
m_loop(loop),
m_session(session),
m_connection(connection),
m_callback(NULL),
m_timeoutSeconds(0),
m_eventTriggered(false),
m_waiting(false)
{
@ -4660,9 +4652,8 @@ void DBusTransportAgent::send(const char *data, size_t len)
connection->m_state = Connection::WAITING;
connection->m_incomingMsg = SharedBuffer();
// setup regular callback
if (m_callback) {
m_eventSource = g_timeout_add_seconds(m_callbackInterval, timeoutCallback, static_cast<gpointer>(this));
if (m_timeoutSeconds) {
m_eventSource = g_timeout_add_seconds(m_timeoutSeconds, timeoutCallback, static_cast<gpointer>(this));
}
m_eventTriggered = false;
@ -4694,8 +4685,6 @@ void DBusTransportAgent::shutdown()
gboolean DBusTransportAgent::timeoutCallback(gpointer transport)
{
DBusTransportAgent *me = static_cast<DBusTransportAgent *>(transport);
me->m_callback(me->m_callbackData);
// TODO: check or remove return code from callback?!
me->m_eventTriggered = true;
if (me->m_waiting) {
g_main_loop_quit(me->m_loop);

View File

@ -34,7 +34,7 @@ CurlTransportAgent::CurlTransportAgent() :
m_easyHandle(easyInit()),
m_slist(NULL),
m_status(INACTIVE),
m_cb(NULL),
m_timeoutSeconds(0),
m_reply(NULL),
m_replyLen(0),
m_replySize(0)
@ -143,11 +143,9 @@ void CurlTransportAgent::setSSL(const std::string &cacerts,
checkCurl(code);
}
void CurlTransportAgent::setCallback (TransportCallback cb, void *udata, int interval)
void CurlTransportAgent::setTimeout(int seconds)
{
m_cb = cb;
m_cbData = udata;
m_cbInterval = interval;
m_timeoutSeconds = seconds;
}
void CurlTransportAgent::shutdown()
@ -179,7 +177,7 @@ void CurlTransportAgent::send(const char *data, size_t len)
m_slist = curl_slist_append(m_slist, contentHeader.c_str());
m_status = ACTIVE;
if(m_cb){
if (m_timeoutSeconds) {
m_sendStartTime = time(NULL);
}
m_aborting = false;
@ -291,17 +289,10 @@ int CurlTransportAgent::progressCallback(void* transport, double, double, double
int CurlTransportAgent::processCallback()
{
if (m_cb){
if (m_timeoutSeconds) {
time_t curTime = time(NULL);
if (curTime - m_sendStartTime > m_cbInterval){
//change here to avoid duplicate call back to the upper layer
m_sendStartTime = curTime;
bool cont = m_cb (m_cbData);
if (cont) {
m_status = TIME_OUT;
}else {
m_aborting = true;
}
if (curTime - m_sendStartTime > m_timeoutSeconds) {
m_status = TIME_OUT;
return -1;
}
}

View File

@ -56,7 +56,7 @@ class CurlTransportAgent : public HTTPTransportAgent
virtual void cancel();
virtual Status wait(bool noReply = false);
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
virtual void setCallback (TransportCallback cb, void * udata, int interval);
virtual void setTimeout(int seconds);
int processCallback();
void setAborting(bool aborting) {m_aborting = aborting;}
@ -67,10 +67,8 @@ class CurlTransportAgent : public HTTPTransportAgent
Status m_status;
bool m_aborting;
TransportCallback m_cb;
void *m_cbData;
time_t m_sendStartTime;
int m_cbInterval;
int m_timeoutSeconds;
/**
* libcurl < 7.17.0 does not copy strings passed into curl_easy_setopt().

View File

@ -585,7 +585,7 @@ void LocalTransportAgent::getReply(const char *&data, size_t &len, std::string &
}
}
void LocalTransportAgent::setCallback (TransportCallback cb, void * udata, int interval)
void LocalTransportAgent::setTimeout(int seconds)
{
// TODO: implement timeout mechanism
}

View File

@ -112,7 +112,7 @@ class LocalTransportAgent : public TransportAgent
virtual void cancel();
virtual Status wait(bool noReply = false);
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
virtual void setCallback (TransportCallback cb, void * udata, int interval);
virtual void setTimeout(int seconds);
private:
SyncContext *m_server;

View File

@ -49,7 +49,7 @@ ObexTransportAgent::ObexTransportAgent (OBEX_TRANS_TYPE type, GMainLoop *loop) :
m_port(-1),
m_buffer(NULL),
m_bufferSize(0),
m_cb(NULL),
m_timeoutSeconds(0),
m_disconnecting(false),
m_connectStatus(START)
{
@ -92,11 +92,9 @@ void ObexTransportAgent::setContentType(const std::string &type) {
}
void ObexTransportAgent::setCallback (TransportCallback cb, void *data, int interval)
void ObexTransportAgent::setTimeout(int seconds)
{
m_cb = cb;
m_cbData = data;
m_cbInterval = interval;
m_timeoutSeconds = seconds;
}
void ObexTransportAgent::connect() {
@ -627,22 +625,17 @@ gboolean ObexTransportAgent::obex_fd_source_cb_impl (GIOChannel *io, GIOConditio
}
time_t now = time(NULL);
if (m_cb && (m_requestStart != 0)
&& (now - m_requestStart > m_cbInterval)) {
if (m_timeoutSeconds &&
(m_requestStart != 0) &&
(now >= m_timeoutSeconds +m_requestStart)) {
m_sock = sockObj;
m_channel = channel;
if (m_cb (m_cbData)){
//timeout
m_status = TIME_OUT;
//currently we will not support transport resend for
//OBEX transport ??
m_disconnecting = true;
cancel();
} else {
//abort
m_disconnecting = true;
cancel();
}
//timeout
m_status = TIME_OUT;
//currently we will not support transport resend for
//OBEX transport ??
m_disconnecting = true;
cancel();
return TRUE;
}

View File

@ -106,7 +106,7 @@ class ObexTransportAgent : public TransportAgent
virtual void cancel();
virtual Status wait(bool noReply);
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
virtual void setCallback (TransportCallback cb, void *udata, int interval);
virtual void setTimeout(int seconds);
/* Obex specific api: connecting the underlying transport */
void connect();
@ -174,10 +174,7 @@ class ObexTransportAgent : public TransportAgent
sdp_session_t *m_sdp;
/* callback supplied by the user of the transport */
TransportCallback m_cb;
void *m_cbData;
time_t m_cbInterval;
int m_timeoutSeconds;
time_t m_requestStart;
/** OBEX poll interval */
static const int OBEX_POLL_INTERVAL = 1;

View File

@ -40,7 +40,7 @@ SoupTransportAgent::SoupTransportAgent(GMainLoop *loop) :
g_main_loop_new(NULL, TRUE),
"Soup main loop"),
m_status(INACTIVE),
m_cb(NULL),
m_timeoutSeconds(0),
m_response(0)
{
#ifdef HAVE_LIBSOUP_SOUP_GNOME_FEATURES_H
@ -114,11 +114,9 @@ void SoupTransportAgent::setUserAgent(const std::string &agent)
NULL);
}
void SoupTransportAgent::setCallback (TransportCallback cb, void *data, int interval)
void SoupTransportAgent::setTimeout(int seconds)
{
m_cb = cb;
m_cbData = data;
m_cbInterval = interval;
m_timeoutSeconds = seconds;
}
void SoupTransportAgent::send(const char *data, size_t len)
@ -146,9 +144,9 @@ void SoupTransportAgent::send(const char *data, size_t len)
SOUP_MEMORY_TEMPORARY, data, len);
m_status = ACTIVE;
m_abortEventSource = g_timeout_add_seconds(ABORT_CHECK_INTERVAL, AbortCallback, static_cast<gpointer> (this));
if(m_cb){
if (m_timeoutSeconds) {
m_message = message.get();
m_cbEventSource = g_timeout_add_seconds(m_cbInterval, TimeoutCallback, static_cast<gpointer> (this));
m_timeoutEventSource = g_timeout_add_seconds(m_timeoutSeconds, TimeoutCallback, static_cast<gpointer> (this));
}
soup_session_queue_message(m_session.get(), message.release(),
SessionCallback, static_cast<gpointer>(this));
@ -197,7 +195,7 @@ TransportAgent::Status SoupTransportAgent::wait(bool noReply)
}
m_abortEventSource.set(0);
m_cbEventSource.set(0);
m_timeoutEventSource.set(0);
return m_status;
}
@ -269,15 +267,10 @@ gboolean SoupTransportAgent::AbortCallback(gpointer transport)
gboolean SoupTransportAgent::processCallback()
{
bool cont = m_cb(m_cbData);
if(cont){
//stop the message processing and mark status as timeout
guint message_status = SOUP_STATUS_CANCELLED;
soup_session_cancel_message(m_session.get(), m_message, message_status);
m_status = TIME_OUT;
}else {
cancel();
}
//stop the message processing and mark status as timeout
guint message_status = SOUP_STATUS_CANCELLED;
soup_session_cancel_message(m_session.get(), m_message, message_status);
m_status = TIME_OUT;
return FALSE;
}

View File

@ -71,7 +71,7 @@ class SoupTransportAgent : public HTTPTransportAgent
virtual void cancel();
virtual Status wait(bool noReply = false);
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
virtual void setCallback (TransportCallback cb, void *udata, int interval);
virtual void setTimeout(int seconds);
gboolean processCallback();
private:
std::string m_proxyUser;
@ -87,10 +87,8 @@ class SoupTransportAgent : public HTTPTransportAgent
GLibEvent m_abortEventSource;
SoupMessage *m_message;
GLibEvent m_cbEventSource;
TransportCallback m_cb;
int m_cbInterval;
void *m_cbData;
GLibEvent m_timeoutEventSource;
int m_timeoutSeconds;
/** User Abort check interval */
static const gint ABORT_CHECK_INTERVAL = 1;

View File

@ -1492,6 +1492,7 @@ boost::shared_ptr<TransportAgent> SyncContext::createTransportAgent(void *gmainl
if (m_localSync) {
string peer = url.substr(strlen("local://"));
boost::shared_ptr<LocalTransportAgent> agent(new LocalTransportAgent(this, peer, gmainloop));
agent->setTimeout(timeout);
agent->start();
return agent;
} else if (boost::starts_with(url, "http://") ||
@ -1500,22 +1501,13 @@ boost::shared_ptr<TransportAgent> SyncContext::createTransportAgent(void *gmainl
boost::shared_ptr<SoupTransportAgent> agent(new SoupTransportAgent(static_cast<GMainLoop *>(gmainloop)));
agent->setConfig(*this);
if (timeout) {
agent->setCallback(transport_cb,
reinterpret_cast<void *>(static_cast<uintptr_t>(timeout)),
timeout);
}
agent->setTimeout(timeout);
return agent;
#elif defined(ENABLE_LIBCURL)
if (!gmainloop) {
boost::shared_ptr<CurlTransportAgent> agent(new CurlTransportAgent());
agent->setConfig(*this);
if (timeout) {
agent->setCallback(transport_cb,
reinterpret_cast<void *>(static_cast<uintptr_t>(timeout)),
timeout);
}
agent->setTimeout(timeout);
return agent;
}
#endif
@ -1525,11 +1517,7 @@ boost::shared_ptr<TransportAgent> SyncContext::createTransportAgent(void *gmainl
boost::shared_ptr<ObexTransportAgent> agent(new ObexTransportAgent(ObexTransportAgent::OBEX_BLUETOOTH,
static_cast<GMainLoop *>(gmainloop)));
agent->setURL (btUrl);
if (timeout) {
agent->setCallback(transport_cb,
reinterpret_cast<void *>(static_cast<uintptr_t>(timeout)),
timeout);
}
agent->setTimeout(timeout);
agent->connect();
return agent;
#endif
@ -2038,17 +2026,6 @@ void SyncContext::startSourceAccess(SyncSource *source)
m_sourceListPtr->syncPrepare(source->getName());
}
bool SyncContext::transport_cb (void *udata)
{
unsigned int interval = reinterpret_cast<uintptr_t>(udata);
SE_LOG_INFO(NULL, NULL, "Transport timeout after %u:%02umin",
interval / 60,
interval % 60);
// never cancel the transport, the higher levels will deal
// with the timeout
return true;
}
// XML configuration converted to C string constants
extern "C" {
// including all known fragments for a client

View File

@ -661,9 +661,8 @@ class SyncContext : public SyncConfig, public ConfigUserInterface {
*
* The agent must be ready for use:
* - HTTP specific settings must have been applied
* - the current SyncContect's transport_cb() must have been
* installed via TransportAgent::setCallback(), with a suitable
* timeout for the agent
* - the current SyncContext's timeout must have been
* installed via TransportAgent::setTimeout()
*
* The default implementation instantiates one of the builtin
* transport agents, depending on how it was compiled.
@ -818,8 +817,6 @@ class SyncContext : public SyncConfig, public ConfigUserInterface {
ostream *m_out;
public:
static bool transport_cb (void *data);
/**
* Returns the URL in the getSyncURL() list which is to be used
* for sync. The long term goal is to pick the first URL which

View File

@ -46,8 +46,6 @@ class SyncConfig;
class TransportAgent
{
public:
typedef bool (*TransportCallback) (void *udata);
/**
* set transport specific URL of next message
*/
@ -134,14 +132,14 @@ class TransportAgent
virtual Status wait(bool noReply = false) = 0;
/**
* The callback is called at most once while a transmission is
* pending, "interval" seconds after calling send(), with udata as
* the last parameter. When the callback returns true, the
* transport will stop waiting for a reply to the message and flag
* a timeout. When the callback returns false, the transport will
* cancel the transmission.
* Tells the transport agent to stop the transmission the given
* amount of seconds after send() was called. The transport agent
* will then stop the message transmission and return a TIME_OUT
* status in wait().
*
* @param seconds number of seconds to wait before timing out, zero for no timeout
*/
virtual void setCallback (TransportCallback cb, void * udata, int interval) = 0;
virtual void setTimeout(int seconds) = 0;
/**
* provides access to reply data

View File

@ -858,8 +858,7 @@ public:
m_wrappedAgent.reset();
}
virtual Status wait(bool noReply = false) { return m_status; }
virtual void setCallback (TransportCallback cb, void *udata, int interval)
{ return m_wrappedAgent->setCallback(cb, udata, interval);}
virtual void setTimeout(int seconds) { m_wrappedAgent->setTimeout(seconds); }
};
/** assert equality, include string in message if unequal */