LocalTransportAgent: implemented timeout handling

Timeouts for the SyncML message are detected on the server side.
No resending is possible, nor needed: this is a reliable local
transport after all, not HTTP.

The implementation uses select() and non-blocking sockets to time out
at the right time. While at it, it also sets CLOEXEC to ensure that
the pipes are not accidentally inherited by other child programs.
This commit is contained in:
Patrick Ohly 2011-02-09 15:33:20 +01:00
parent 40f84d7fc1
commit 834bede321
2 changed files with 144 additions and 59 deletions

View File

@ -28,6 +28,7 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <algorithm>
@ -46,6 +47,7 @@ LocalTransportAgent::LocalTransportAgent(SyncContext *server,
m_server(server),
m_clientContext(SyncConfig::normalizeConfigString(clientContext)),
m_loop(static_cast<GMainLoop *>(loop)),
m_timeoutSeconds(0),
m_status(INACTIVE),
m_messageFD(-1),
m_statusFD(-1),
@ -98,6 +100,22 @@ void LocalTransportAgent::start()
m_server->throwError("socketpair()", errno);
}
// Set close-on-exec flag for all file descriptors:
// they are used for tracking the death of either
// parent or child, so additional processes should
// not inherit them.
//
// Also set them to non-blocking, needed for the
// timeout handling.
for (int *fd = &sockets[0][0];
fd < &sockets[2][2];
++fd) {
long flags = fcntl(*fd, F_GETFD);
fcntl(*fd, F_SETFD, flags | FD_CLOEXEC);
flags = fcntl(*fd, F_GETFL);
fcntl(*fd, F_SETFL, flags | O_NONBLOCK);
}
pid_t pid = fork();
switch (pid) {
case -1:
@ -170,6 +188,9 @@ void LocalTransportAgent::run()
--index;
}
// Ignore parent's timeout.
m_timeoutSeconds = 0;
// Now run. Under no circumstances must we leave this function,
// because our caller is not prepared for running inside a forked
// process.
@ -276,7 +297,7 @@ void LocalTransportAgent::run()
m_clientReport.getError().c_str(),
data->c_str());
node.flush();
writeMessage(m_statusFD, Message::MSG_SYNC_REPORT, data->c_str(), data->size());
writeMessage(m_statusFD, Message::MSG_SYNC_REPORT, data->c_str(), data->size(), 0);
} catch (...) {
SyncMLStatus status = m_clientReport.getStatus();
Exception::handle(&status, redirect);
@ -341,16 +362,19 @@ void LocalTransportAgent::receiveChildReport()
try {
SE_LOG_DEBUG(NULL, NULL, "parent: receiving report");
m_receiveBuffer.m_used = 0;
readMessage(statusFD, m_receiveBuffer);
boost::shared_ptr<std::string> data(new std::string);
data->assign(m_receiveBuffer.m_message->m_data, m_receiveBuffer.m_message->getDataLength());
boost::shared_ptr<StringDataBlob> dump(new StringDataBlob("buffer", data, false));
IniHashConfigNode node(dump);
node >> m_clientReport;
SE_LOG_DEBUG(NULL, NULL, "parent: received report (%s/ERROR '%s'):\n%s",
Status2String(m_clientReport.getStatus()).c_str(),
m_clientReport.getError().c_str(),
data->c_str());
if (readMessage(statusFD, m_receiveBuffer, deadline())) {
boost::shared_ptr<std::string> data(new std::string);
data->assign(m_receiveBuffer.m_message->m_data, m_receiveBuffer.m_message->getDataLength());
boost::shared_ptr<StringDataBlob> dump(new StringDataBlob("buffer", data, false));
IniHashConfigNode node(dump);
node >> m_clientReport;
SE_LOG_DEBUG(NULL, NULL, "parent: received report (%s/ERROR '%s'):\n%s",
Status2String(m_clientReport.getStatus()).c_str(),
m_clientReport.getError().c_str(),
data->c_str());
} else {
SE_LOG_DEBUG(NULL, NULL, "parent: timeout receiving report");
}
} catch (...) {
close(statusFD);
throw;
@ -389,6 +413,7 @@ bool LocalTransportAgent::Buffer::haveMessage()
void LocalTransportAgent::send(const char *data, size_t len)
{
m_status = ACTIVE;
if (m_loop) {
SE_THROW("glib support not implemented");
} else {
@ -403,12 +428,12 @@ void LocalTransportAgent::send(const char *data, size_t len)
m_receiveBuffer.m_used - len);
m_receiveBuffer.m_used -= len;
}
writeMessage(m_messageFD, m_sendType, data, len);
m_sendStartTime = time(NULL);
writeMessage(m_messageFD, m_sendType, data, len, deadline());
}
m_status = ACTIVE;
}
void LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *data, size_t len)
bool LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *data, size_t len, time_t deadline)
{
Message header;
header.m_type = type;
@ -424,29 +449,68 @@ void LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *d
(long)len,
fd == m_messageFD ? "message channel" : "other channel");
do {
ssize_t sent = writev(fd, vec, 2);
if (sent == -1) {
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes failed: %s",
// sleep, possibly with a deadline
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(fd, &writefds);
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
if (deadline) {
time_t now = time(NULL);
if (now >= deadline) {
return false;
} else {
timeout.tv_sec = deadline - now;
}
}
SE_LOG_DEBUG(NULL, NULL, "%s: write select on %s %ld.%lds",
m_pid ? "parent" : "child",
fd == m_messageFD ? "message channel" : "other channel",
(long)timeout.tv_sec,
(long)timeout.tv_usec);
int res = select(fd + 1, NULL, &writefds, NULL,
(timeout.tv_sec || timeout.tv_usec) ? &timeout : NULL);
switch (res) {
case 0:
SE_LOG_DEBUG(NULL, NULL, "%s: select timeout",
m_pid ? "parent" : "child");
return false;
break;
case 1: {
ssize_t sent = writev(fd, vec, 2);
if (sent == -1) {
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes failed: %s",
m_pid ? "parent" : "child",
(long)len,
strerror(errno));
SE_THROW_EXCEPTION(TransportException,
StringPrintf("writev(): %s", strerror(errno)));
}
// potential partial write, reduce byte counters by amount of bytes sent
ssize_t part1 = std::min((ssize_t)vec[0].iov_len, sent);
vec[0].iov_len -= part1;
sent -= part1;
ssize_t part2 = std::min((ssize_t)vec[1].iov_len, sent);
vec[1].iov_len -= part2;
sent -= part2;
break;
}
default:
SE_LOG_DEBUG(NULL, NULL, "%s: select errror: %s",
m_pid ? "parent" : "child",
(long)len,
strerror(errno));
SE_THROW_EXCEPTION(TransportException,
StringPrintf("writev(): %s", strerror(errno)));
StringPrintf("select(): %s", strerror(errno)));
break;
}
// potential partial write, reduce byte counters by amount of bytes sent
ssize_t part1 = std::min((ssize_t)vec[0].iov_len, sent);
vec[0].iov_len -= part1;
sent -= part1;
ssize_t part2 = std::min((ssize_t)vec[1].iov_len, sent);
vec[1].iov_len -= part2;
sent -= part2;
// might be completely sent now, check
} while (vec[1].iov_len);
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes done",
m_pid ? "parent" : "child",
(long)len);
return true;
}
void LocalTransportAgent::cancel()
@ -455,56 +519,66 @@ void LocalTransportAgent::cancel()
TransportAgent::Status LocalTransportAgent::wait(bool noReply)
{
switch (m_status) {
case ACTIVE:
if (m_status == ACTIVE) {
// need next message; for noReply == true we are done
if (noReply) {
m_status = INACTIVE;
return m_status;
}
if (m_loop) {
} else if (m_loop) {
SE_THROW("glib support not implemented");
} else {
if (!m_receiveBuffer.haveMessage()) {
readMessage(m_messageFD, m_receiveBuffer);
}
// complete message received, check if it is SyncML
switch (m_receiveBuffer.m_message->m_type) {
case Message::MSG_SYNCML_XML:
case Message::MSG_SYNCML_WBXML:
m_status = GOT_REPLY;
break;
default:
// TODO: handle other types
SE_THROW("unsupported message type");
break;
if (readMessage(m_messageFD, m_receiveBuffer, deadline())) {
// complete message received, check if it is SyncML
switch (m_receiveBuffer.m_message->m_type) {
case Message::MSG_SYNCML_XML:
case Message::MSG_SYNCML_WBXML:
m_status = GOT_REPLY;
break;
default:
// TODO: handle other types
SE_THROW("unsupported message type");
break;
}
} else {
m_status = TIME_OUT;
}
}
}
break;
default:
return m_status;
}
return m_status;
}
void LocalTransportAgent::readMessage(int fd, Buffer &buffer)
bool LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
{
while (!buffer.haveMessage()) {
// use select to implement timeout (TODO)
// use select to implement timeout
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
SE_LOG_DEBUG(NULL, NULL, "%s: waiting on %s",
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
if (deadline) {
time_t now = time(NULL);
if (now >= deadline) {
// already too late
return false;
} else {
timeout.tv_sec = deadline - now;
}
}
SE_LOG_DEBUG(NULL, NULL, "%s: read select on %s %ld.%lds",
m_pid ? "parent" : "child",
fd == m_messageFD ? "message channel" : "other channel");
int res = select(fd + 1, &readfds, NULL, NULL, NULL);
fd == m_messageFD ? "message channel" : "other channel",
(long)timeout.tv_sec,
(long)timeout.tv_usec);
int res = select(fd + 1, &readfds, NULL, NULL,
(timeout.tv_sec || timeout.tv_usec) ? &timeout : NULL);
switch (res) {
case 0:
SE_LOG_DEBUG(NULL, NULL, "%s: select timeout",
m_pid ? "parent" : "child");
// TODO: timeout
SE_THROW("internal error, unexpected timeout");
return false;
break;
case 1: {
// data ready, ensure that buffer is available
@ -560,6 +634,8 @@ void LocalTransportAgent::readMessage(int fd, Buffer &buffer)
break;
}
}
return true;
}
void LocalTransportAgent::getReply(const char *&data, size_t &len, std::string &contentType)
@ -587,7 +663,7 @@ void LocalTransportAgent::getReply(const char *&data, size_t &len, std::string &
void LocalTransportAgent::setTimeout(int seconds)
{
// TODO: implement timeout mechanism
m_timeoutSeconds = seconds;
}
SE_END_CXX

View File

@ -118,6 +118,8 @@ class LocalTransportAgent : public TransportAgent
SyncContext *m_server;
string m_clientContext;
GMainLoop *m_loop;
int m_timeoutSeconds;
time_t m_sendStartTime;
Status m_status;
@ -167,22 +169,29 @@ class LocalTransportAgent : public TransportAgent
/**
* Write Message with given type into file descriptor.
* Retries until error or all data written.
*
* @return true for success, false if deadline is reached, exception for fatal error
*/
void writeMessage(int fd, Message::Type type, const char *data, size_t len);
bool writeMessage(int fd, Message::Type type, const char *data, size_t len, time_t deadline);
/**
* Read bytes into buffer until complete Message
* is assembled. Will read additional bytes beyond
* end of that Message if available. An existing
* complete message is not overwritten.
*
* @return true for success, false if deadline is reached, exception for fatal error
*/
void readMessage(int fd, Buffer &buffer);
bool readMessage(int fd, Buffer &buffer, time_t deadline);
/** utility function for parent: copy child's report into m_clientReport */
void receiveChildReport();
/** utility function for parent: check m_clientReport and log/throw errors */
void checkChildReport();
/** utility function: calculate deadline for operation starting now */
time_t deadline() { return m_timeoutSeconds ? (time(NULL) + m_timeoutSeconds) : 0; }
};
SE_END_CXX