LocalTransportAgent: added transmission of child's SyncReport
The SyncReport is transmitted via an additional socket pair. This is necessary because the one for messages is shut down to notify the peer of a premature shutdown. The parent uses the information obtained from the SyncReport to update its own status: first the error is logged (thus also setting the "first error seen" part of the parent), then an exception with a bad child status is thrown (interrupting anything the parent was doing and, if it was the first error, setting the overall status of the sync).
This commit is contained in:
parent
dc2ce796cb
commit
51111bea95
2 changed files with 153 additions and 33 deletions
|
@ -21,6 +21,8 @@
|
|||
#include <syncevo/SyncContext.h>
|
||||
#include <syncevo/SyncML.h>
|
||||
#include <syncevo/LogRedirect.h>
|
||||
#include <syncevo/StringDataBlob.h>
|
||||
#include <syncevo/IniConfigNode.h>
|
||||
|
||||
#include <stddef.h>
|
||||
#include <sys/socket.h>
|
||||
|
@ -45,12 +47,20 @@ LocalTransportAgent::LocalTransportAgent(SyncContext *server,
|
|||
m_clientContext(SyncConfig::normalizeConfigString(clientContext)),
|
||||
m_loop(static_cast<GMainLoop *>(loop)),
|
||||
m_status(INACTIVE),
|
||||
m_messageFD(-1),
|
||||
m_statusFD(-1),
|
||||
m_pid(0)
|
||||
{
|
||||
}
|
||||
|
||||
LocalTransportAgent::~LocalTransportAgent()
|
||||
{
|
||||
if (m_statusFD >= 0) {
|
||||
close(m_statusFD);
|
||||
}
|
||||
if (m_messageFD >= 0) {
|
||||
close(m_messageFD);
|
||||
}
|
||||
if (m_pid) {
|
||||
SE_LOG_DEBUG(NULL, NULL, "starting to wait for child process %ld in destructor", (long)m_pid);
|
||||
int status;
|
||||
|
@ -61,7 +71,7 @@ LocalTransportAgent::~LocalTransportAgent()
|
|||
|
||||
void LocalTransportAgent::start()
|
||||
{
|
||||
int sockets[2];
|
||||
int sockets[2][2];
|
||||
|
||||
// compare normalized context names to detect forbidden sync
|
||||
// within the same context; they could be set up, but are more
|
||||
|
@ -77,31 +87,49 @@ void LocalTransportAgent::start()
|
|||
SE_THROW(StringPrintf("invalid local sync inside context '%s', need second context with different databases", context.c_str()));
|
||||
}
|
||||
|
||||
if (socketpair(AF_LOCAL,
|
||||
SOCK_STREAM,
|
||||
0, sockets)) {
|
||||
m_server->throwError("socketpair()", errno);
|
||||
}
|
||||
memset(sockets, 0, sizeof(sockets));
|
||||
try {
|
||||
if (socketpair(AF_LOCAL,
|
||||
SOCK_STREAM,
|
||||
0, sockets[0]) ||
|
||||
socketpair(AF_LOCAL,
|
||||
SOCK_STREAM,
|
||||
0, sockets[1])) {
|
||||
m_server->throwError("socketpair()", errno);
|
||||
}
|
||||
|
||||
pid_t pid = fork();
|
||||
switch (pid) {
|
||||
case -1:
|
||||
m_server->throwError("fork()", errno);
|
||||
break;
|
||||
case 0:
|
||||
// child
|
||||
close(sockets[0]);
|
||||
m_messageFD = sockets[1];
|
||||
run();
|
||||
break;
|
||||
default:
|
||||
// parent
|
||||
close(sockets[1]);
|
||||
m_messageFD = sockets[0];
|
||||
// first message must come from child
|
||||
m_status = ACTIVE;
|
||||
m_pid = pid;
|
||||
break;
|
||||
pid_t pid = fork();
|
||||
switch (pid) {
|
||||
case -1:
|
||||
m_server->throwError("fork()", errno);
|
||||
break;
|
||||
case 0:
|
||||
// child
|
||||
close(sockets[0][0]);
|
||||
m_messageFD = sockets[0][1];
|
||||
close(sockets[1][0]);
|
||||
m_statusFD = sockets[1][1];
|
||||
run();
|
||||
break;
|
||||
default:
|
||||
// parent
|
||||
close(sockets[0][1]);
|
||||
m_messageFD = sockets[0][0];
|
||||
close(sockets[1][1]);
|
||||
m_statusFD = sockets[1][0];
|
||||
// first message must come from child
|
||||
m_status = ACTIVE;
|
||||
m_pid = pid;
|
||||
break;
|
||||
}
|
||||
} catch (...) {
|
||||
for (int *fd = &sockets[0][0];
|
||||
fd < &sockets[2][2];
|
||||
++fd) {
|
||||
if (*fd) {
|
||||
close(*fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,6 +225,30 @@ void LocalTransportAgent::run()
|
|||
m_clientReport.setStatus(status);
|
||||
}
|
||||
|
||||
// send final report
|
||||
try {
|
||||
if (m_messageFD >= 0) {
|
||||
close(m_messageFD);
|
||||
m_messageFD = -1;
|
||||
}
|
||||
|
||||
// matches parent's code in shutdown()
|
||||
boost::shared_ptr<std::string> data(new std::string);
|
||||
boost::shared_ptr<StringDataBlob> dump(new StringDataBlob("buffer", data, false));
|
||||
IniHashConfigNode node(dump);
|
||||
node << m_clientReport;
|
||||
SE_LOG_DEBUG(NULL, NULL, "client: sending report (%s/ERROR '%s'):\n%s",
|
||||
Status2String(m_clientReport.getStatus()).c_str(),
|
||||
m_clientReport.getError().c_str(),
|
||||
data->c_str());
|
||||
node.flush();
|
||||
writeMessage(m_statusFD, Message::MSG_SYNC_REPORT, data->c_str(), data->size());
|
||||
} catch (...) {
|
||||
SyncMLStatus status = m_clientReport.getStatus();
|
||||
Exception::handle(&status, redirect);
|
||||
m_clientReport.setStatus(status);
|
||||
}
|
||||
|
||||
if (redirect) {
|
||||
redirect->flush();
|
||||
}
|
||||
|
@ -221,16 +273,60 @@ void LocalTransportAgent::setContentType(const std::string &type)
|
|||
|
||||
void LocalTransportAgent::shutdown()
|
||||
{
|
||||
if (m_pid) {
|
||||
// TODO: get sync report, then wait for exit in child.
|
||||
if (m_messageFD) {
|
||||
// close message transports, tells peer to shut down
|
||||
close(m_messageFD);
|
||||
m_messageFD = -1;
|
||||
}
|
||||
|
||||
if (m_pid) {
|
||||
// parent: receive SyncReport
|
||||
receiveChildReport();
|
||||
|
||||
// join forked process
|
||||
int status;
|
||||
SE_LOG_DEBUG(NULL, NULL, "starting to wait for child process %ld in shutdown()", (long)m_pid);
|
||||
pid_t res = waitpid(m_pid, &status, 0);
|
||||
SE_LOG_DEBUG(NULL, NULL, "child %ld completed, status %d", (long)res, status);
|
||||
m_pid = 0;
|
||||
|
||||
// now relay the result from our child, will be added to
|
||||
// our own sync report if it doesn't have an error already
|
||||
checkChildReport();
|
||||
} else {
|
||||
close(m_messageFD);
|
||||
// child: sends SyncReport at the end of run()
|
||||
}
|
||||
}
|
||||
|
||||
void LocalTransportAgent::receiveChildReport()
|
||||
{
|
||||
SE_LOG_DEBUG(NULL, NULL, "parent: receiving report");
|
||||
m_receiveBuffer.m_used = 0;
|
||||
readMessage(m_statusFD, m_receiveBuffer);
|
||||
boost::shared_ptr<std::string> data(new std::string);
|
||||
data->assign(m_receiveBuffer.m_message->m_data, m_receiveBuffer.m_message->getDataLength());
|
||||
boost::shared_ptr<StringDataBlob> dump(new StringDataBlob("buffer", data, false));
|
||||
IniHashConfigNode node(dump);
|
||||
node >> m_clientReport;
|
||||
SE_LOG_DEBUG(NULL, NULL, "parent: received report (%s/ERROR '%s'):\n%s",
|
||||
Status2String(m_clientReport.getStatus()).c_str(),
|
||||
m_clientReport.getError().c_str(),
|
||||
data->c_str());
|
||||
}
|
||||
|
||||
void LocalTransportAgent::checkChildReport()
|
||||
{
|
||||
std::string childError = "child process failed";
|
||||
if (!m_clientReport.getError().empty()) {
|
||||
childError += ": ";
|
||||
childError += m_clientReport.getError();
|
||||
SE_LOG_ERROR(NULL, NULL, "%s", childError.c_str());
|
||||
}
|
||||
if (m_clientReport.getStatus() != STATUS_HTTP_OK &&
|
||||
m_clientReport.getStatus() != STATUS_OK) {
|
||||
SE_THROW_EXCEPTION_STATUS(TransportStatusException,
|
||||
childError,
|
||||
m_clientReport.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -304,9 +400,8 @@ TransportAgent::Status LocalTransportAgent::wait(bool noReply)
|
|||
{
|
||||
switch (m_status) {
|
||||
case ACTIVE:
|
||||
// need next message; for noReply == true, it is the SyncReport (TODO)
|
||||
// need next message; for noReply == true we are done
|
||||
if (noReply) {
|
||||
// TODO: remove this code and send SyncReport as last message in client
|
||||
m_status = INACTIVE;
|
||||
return m_status;
|
||||
}
|
||||
|
@ -380,8 +475,17 @@ void LocalTransportAgent::readMessage(int fd, Buffer &buffer)
|
|||
SE_THROW_EXCEPTION(TransportException,
|
||||
StringPrintf("message receive: %s", strerror(errno)));
|
||||
} else if (!recvd) {
|
||||
SE_THROW_EXCEPTION(TransportException,
|
||||
"client has died unexpectedly");
|
||||
if (m_pid) {
|
||||
// Child died. Try to get its sync report to find out why.
|
||||
receiveChildReport();
|
||||
checkChildReport();
|
||||
// if no exception yet, raise a generic one
|
||||
SE_THROW_EXCEPTION(TransportException,
|
||||
"child has died unexpectedly");
|
||||
} else {
|
||||
SE_THROW_EXCEPTION(TransportException,
|
||||
"parent has died unexpectedly");
|
||||
}
|
||||
}
|
||||
buffer.m_used += recvd;
|
||||
break;
|
||||
|
|
|
@ -48,7 +48,8 @@ struct Message
|
|||
MSG_SYNCML_XML,
|
||||
MSG_SYNCML_WBXML,
|
||||
MSG_PASSWORD_REQUEST,
|
||||
MSG_PASSWORD_RESPONSE
|
||||
MSG_PASSWORD_RESPONSE,
|
||||
MSG_SYNC_REPORT
|
||||
} m_type;
|
||||
|
||||
/** length including header */
|
||||
|
@ -150,6 +151,15 @@ class LocalTransportAgent : public TransportAgent
|
|||
*/
|
||||
int m_messageFD;
|
||||
|
||||
/**
|
||||
* Second read/write stream socket for transferring final
|
||||
* status. Same communication method as for m_messageFD.
|
||||
* Necessary because the regular communication
|
||||
* channel needs to be closed in case of a failure, to
|
||||
* notify the peer.
|
||||
*/
|
||||
int m_statusFD;
|
||||
|
||||
/** 0 in client, child PID in server */
|
||||
pid_t m_pid;
|
||||
|
||||
|
@ -170,6 +180,12 @@ class LocalTransportAgent : public TransportAgent
|
|||
* complete message is not overwritten.
|
||||
*/
|
||||
void readMessage(int fd, Buffer &buffer);
|
||||
|
||||
/** utility function for parent: copy child's report into m_clientReport */
|
||||
void receiveChildReport();
|
||||
|
||||
/** utility function for parent: check m_clientReport and log/throw errors */
|
||||
void checkChildReport();
|
||||
};
|
||||
|
||||
SE_END_CXX
|
||||
|
|
Loading…
Reference in a new issue