LocalTransportAgent: refactored read/write code

Moved the read/write code into subroutines. Added support for partial
writes.
This commit is contained in:
Patrick Ohly 2010-11-03 10:01:27 +01:00
parent e60ba32b9c
commit b9b38520fa
2 changed files with 166 additions and 115 deletions

View File

@ -27,18 +27,11 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <algorithm>
#include <syncevo/declarations.h>
SE_BEGIN_CXX
/**
* SyncML message data. Header followed directly by data.
*/
struct SyncMLMessage
{
Message m_message;
char m_data[0];
};
class NoopAgentDestructor
{
public:
@ -52,8 +45,6 @@ LocalTransportAgent::LocalTransportAgent(SyncContext *server,
m_clientContext(SyncConfig::normalizeConfigString(clientContext)),
m_loop(static_cast<GMainLoop *>(loop)),
m_status(INACTIVE),
m_receiveBufferSize(0),
m_receivedBytes(0),
m_pid(0)
{
}
@ -249,42 +240,60 @@ void LocalTransportAgent::send(const char *data, size_t len)
SE_THROW("glib support not implemented");
} else {
// first throw away old received message
if (m_receivedBytes >= sizeof(Message) &&
m_receiveBuffer->m_length <= m_receivedBytes) {
size_t len = m_receiveBuffer->m_length;
if (m_receiveBuffer.haveMessage()) {
size_t len = m_receiveBuffer.m_message->m_length;
// memmove() probably never necessary because receiving
// ends directly after complete message, but doesn't hurt
// either...
memmove(m_receiveBuffer.get(),
(char *)m_receiveBuffer.get() + len,
m_receivedBytes - len);
m_receivedBytes -= len;
memmove(m_receiveBuffer.m_message.get(),
(char *)m_receiveBuffer.m_message.get() + len,
m_receiveBuffer.m_used - len);
m_receiveBuffer.m_used -= len;
}
writeMessage(m_messageFD, m_sendType, data, len);
}
m_status = ACTIVE;
}
SyncMLMessage header;
header.m_message.m_type = m_sendType;
header.m_message.m_length = sizeof(Message) + len;
struct iovec vec[2];
vec[0].iov_base = &header;
vec[0].iov_len = offsetof(SyncMLMessage, m_data);
vec[1].iov_base = (void *)data;
vec[1].iov_len = len;
// TODO: handle timeouts and aborts while writing
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes",
m_pid ? "parent" : "child",
(long)len);
if (writev(m_messageFD, vec, 2) == -1) {
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes failed",
void LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *data, size_t len)
{
Message header;
header.m_type = type;
header.m_length = sizeof(Message) + len;
struct iovec vec[2];
vec[0].iov_base = &header;
vec[0].iov_len = offsetof(Message, m_data);
vec[1].iov_base = (void *)data;
vec[1].iov_len = len;
// TODO: handle timeouts and aborts while writing
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes via %s",
m_pid ? "parent" : "child",
(long)len,
fd == m_messageFD ? "message channel" : "other channel");
do {
ssize_t sent = writev(fd, vec, 2);
if (sent == -1) {
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes failed: %s",
m_pid ? "parent" : "child",
(long)len);
(long)len,
strerror(errno));
SE_THROW_EXCEPTION(TransportException,
StringPrintf("writev(): %s", strerror(errno)));
}
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes done",
m_pid ? "parent" : "child",
(long)len);
}
m_status = ACTIVE;
// potential partial write, reduce byte counters by amount of bytes sent
ssize_t part1 = std::min((ssize_t)vec[0].iov_len, sent);
vec[0].iov_len -= part1;
sent -= part1;
ssize_t part2 = std::min((ssize_t)vec[1].iov_len, sent);
vec[1].iov_len -= part2;
sent -= part2;
// might be completely sent now, check
} while (vec[1].iov_len);
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes done",
m_pid ? "parent" : "child",
(long)len);
}
void LocalTransportAgent::cancel()
@ -304,75 +313,20 @@ TransportAgent::Status LocalTransportAgent::wait(bool noReply)
if (m_loop) {
SE_THROW("glib support not implemented");
} else {
while (m_status == ACTIVE &&
(!m_receiveBufferSize ||
m_receiveBufferSize < sizeof(Message) ||
m_receivedBytes < m_receiveBuffer->m_length)) {
// use select to implement timeout (TODO)
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(m_messageFD, &readfds);
SE_LOG_DEBUG(NULL, NULL, "%s: waiting",
m_pid ? "parent" : "child");
int res = select(m_messageFD + 1, &readfds, NULL, NULL, NULL);
switch (res) {
case 0:
SE_LOG_DEBUG(NULL, NULL, "%s: select timeout",
m_pid ? "parent" : "child");
// TODO: timeout
SE_THROW("internal error, unexpected timeout");
break;
case 1: {
// data ready, ensure that buffer is available
if (!m_receiveBufferSize) {
m_receiveBufferSize = m_server->getMaxMsgSize();
m_receiveBuffer.set(static_cast<Message *>(malloc(m_receiveBufferSize)),
"Message Buffer");
} else if (m_receivedBytes >= sizeof(Message) &&
m_receiveBuffer->m_length > m_receiveBufferSize) {
m_receiveBuffer.set(static_cast<Message *>(realloc(m_receiveBuffer.release(), m_receiveBuffer->m_length)),
"Message Buffer");
}
SE_LOG_DEBUG(NULL, NULL, "%s: recv",
m_pid ? "parent" : "child");
ssize_t recvd = recv(m_messageFD,
(char *)m_receiveBuffer.get() + m_receivedBytes,
m_receiveBufferSize - m_receivedBytes,
MSG_DONTWAIT);
SE_LOG_DEBUG(NULL, NULL, "%s: received %ld",
m_pid ? "parent" : "child",
(long)recvd);
if (recvd < 0) {
SE_THROW_EXCEPTION(TransportException,
StringPrintf("message receive: %s", strerror(errno)));
} else if (!recvd) {
SE_THROW_EXCEPTION(TransportException,
"client has died unexpectedly");
}
m_receivedBytes += recvd;
break;
}
default:
SE_LOG_DEBUG(NULL, NULL, "%s: select errror: %s",
m_pid ? "parent" : "child",
strerror(errno));
SE_THROW_EXCEPTION(TransportException,
StringPrintf("select(): %s", strerror(errno)));
break;
}
if (!m_receiveBuffer.haveMessage()) {
readMessage(m_messageFD, m_receiveBuffer);
}
if (m_status == ACTIVE) {
// complete message received, check if it is SyncML
switch (m_receiveBuffer->m_type) {
case Message::MSG_SYNCML_XML:
case Message::MSG_SYNCML_WBXML:
m_status = GOT_REPLY;
break;
default:
// TODO: handle other types
SE_THROW("unsupported message type");
break;
}
// complete message received, check if it is SyncML
switch (m_receiveBuffer.m_message->m_type) {
case Message::MSG_SYNCML_XML:
case Message::MSG_SYNCML_WBXML:
m_status = GOT_REPLY;
break;
default:
// TODO: handle other types
SE_THROW("unsupported message type");
break;
}
}
break;
@ -382,12 +336,73 @@ TransportAgent::Status LocalTransportAgent::wait(bool noReply)
return m_status;
}
void LocalTransportAgent::readMessage(int fd, Buffer &buffer)
{
while (!buffer.haveMessage()) {
// use select to implement timeout (TODO)
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(fd, &readfds);
SE_LOG_DEBUG(NULL, NULL, "%s: waiting on %s",
m_pid ? "parent" : "child",
fd == m_messageFD ? "message channel" : "other channel");
int res = select(fd + 1, &readfds, NULL, NULL, NULL);
switch (res) {
case 0:
SE_LOG_DEBUG(NULL, NULL, "%s: select timeout",
m_pid ? "parent" : "child");
// TODO: timeout
SE_THROW("internal error, unexpected timeout");
break;
case 1: {
// data ready, ensure that buffer is available
if (!buffer.m_size) {
buffer.m_size = m_server->getMaxMsgSize();
buffer.m_message.set(static_cast<Message *>(malloc(buffer.m_size)),
"Message Buffer");
} else if (buffer.m_used >= sizeof(Message) &&
buffer.m_message->m_length > buffer.m_size) {
buffer.m_message.set(static_cast<Message *>(realloc(buffer.m_message.release(), buffer.m_message->m_length)),
"Message Buffer");
}
SE_LOG_DEBUG(NULL, NULL, "%s: recv %ld bytes",
m_pid ? "parent" : "child",
(long)(buffer.m_size - buffer.m_used));
ssize_t recvd = recv(fd,
(char *)buffer.m_message.get() + buffer.m_used,
buffer.m_size - buffer.m_used,
MSG_DONTWAIT);
SE_LOG_DEBUG(NULL, NULL, "%s: received %ld: %s",
m_pid ? "parent" : "child",
(long)recvd,
recvd < 0 ? strerror(errno) : "okay");
if (recvd < 0) {
SE_THROW_EXCEPTION(TransportException,
StringPrintf("message receive: %s", strerror(errno)));
} else if (!recvd) {
SE_THROW_EXCEPTION(TransportException,
"client has died unexpectedly");
}
buffer.m_used += recvd;
break;
}
default:
SE_LOG_DEBUG(NULL, NULL, "%s: select errror: %s",
m_pid ? "parent" : "child",
strerror(errno));
SE_THROW_EXCEPTION(TransportException,
StringPrintf("select(): %s", strerror(errno)));
break;
}
}
}
void LocalTransportAgent::getReply(const char *&data, size_t &len, std::string &contentType)
{
if (m_status != GOT_REPLY) {
SE_THROW("internal error, no reply available");
}
switch (m_receiveBuffer->m_type) {
switch (m_receiveBuffer.m_message->m_type) {
case Message::MSG_SYNCML_XML:
contentType = m_contentTypeSyncML;
break;
@ -400,9 +415,8 @@ void LocalTransportAgent::getReply(const char *&data, size_t &len, std::string &
break;
}
if (!contentType.empty()) {
SyncMLMessage *msg = reinterpret_cast<SyncMLMessage *>(m_receiveBuffer.get());
len = m_receiveBuffer->m_length - offsetof(SyncMLMessage, m_data);
data = msg->m_data;
len = m_receiveBuffer.m_message->getDataLength();
data = m_receiveBuffer.m_message->m_data;
}
}

View File

@ -53,6 +53,12 @@ struct Message
/** length including header */
size_t m_length;
/** payload */
char m_data[0];
/** length excluding header */
size_t getDataLength() const { return m_length - offsetof(Message, m_data); }
};
@ -118,13 +124,30 @@ class LocalTransportAgent : public TransportAgent
Message::Type m_sendType;
/** buffer for message, with total length of buffer as size */
SmartPtr<Message *, Message *, UnrefFree<Message> > m_receiveBuffer;
size_t m_receiveBufferSize;
class Buffer {
public:
Buffer() :
m_size(0),
m_used(0)
{}
/** actual message */
SmartPtr<Message *, Message *, UnrefFree<Message> > m_message;
/** number of allocated bytes */
size_t m_size;
/** number of valid bytes in buffer */
size_t m_used;
/** number of received bytes in buffer */
size_t m_receivedBytes;
bool haveMessage() {
return m_used >= sizeof(Message) &&
m_message->m_length <= m_used;
}
} m_receiveBuffer;
/** read/write stream socket for sending/receiving messages */
/**
* Read/write stream socket for sending/receiving messages.
* Data is sent as struct Message (includes type and length),
* with per-type payload.
*/
int m_messageFD;
/** 0 in client, child PID in server */
@ -133,6 +156,20 @@ class LocalTransportAgent : public TransportAgent
SyncReport m_clientReport;
void run();
/**
* Write Message with given type into file descriptor.
* Retries until error or all data written.
*/
void writeMessage(int fd, Message::Type type, const char *data, size_t len);
/**
* Read bytes into buffer until complete Message
* is assembled. Will read additional bytes beyond
* end of that Message if available. An existing
* complete message is not overwritten.
*/
void readMessage(int fd, Buffer &buffer);
};
SE_END_CXX