local sync: better abort handling

When the glib event loop is left because the D-Bus client has
requested an abort, the LocalTransportAgent should simply return a
"failed" status and let the caller handle the abort. The return code
of write/readMessage() must be able to convey that - extended from
boolean to an enum.

SyncContext did not do that correctly in server mode: the check for
abort must be done before giving up by throwing an exception.

The D-Bus test now checks that the right status is recorded (wasn't
the case earlier).
This commit is contained in:
Patrick Ohly 2011-02-16 09:50:43 +01:00
parent 6a89155131
commit 067a370f40
4 changed files with 47 additions and 35 deletions

View File

@ -375,7 +375,7 @@ void LocalTransportAgent::receiveChildReport()
try {
SE_LOG_DEBUG(NULL, NULL, "parent: receiving report");
m_receiveBuffer.m_used = 0;
if (readMessage(statusFD, m_receiveBuffer, deadline())) {
if (readMessage(statusFD, m_receiveBuffer, deadline()) == ACTIVE) {
boost::shared_ptr<std::string> data(new std::string);
data->assign(m_receiveBuffer.m_message->m_data, m_receiveBuffer.m_message->getDataLength());
boost::shared_ptr<StringDataBlob> dump(new StringDataBlob("buffer", data, false));
@ -439,10 +439,10 @@ void LocalTransportAgent::send(const char *data, size_t len)
m_receiveBuffer.m_used -= len;
}
m_sendStartTime = time(NULL);
writeMessage(m_messageFD, m_sendType, data, len, deadline());
m_status = writeMessage(m_messageFD, m_sendType, data, len, deadline());
}
bool LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *data, size_t len, time_t deadline)
TransportAgent::Status LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *data, size_t len, time_t deadline)
{
Message header;
header.m_type = type;
@ -465,7 +465,7 @@ bool LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *d
if (deadline) {
time_t now = time(NULL);
if (now >= deadline) {
return false;
return TIME_OUT;
} else {
timeout.tv_sec = deadline - now;
}
@ -484,7 +484,8 @@ bool LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *d
res = 1;
break;
case GLIB_SELECT_QUIT:
SE_THROW("quit transport as requested as part of GLib event loop");
SE_LOG_DEBUG(NULL, NULL, "quit transport as requested as part of GLib event loop");
return FAILED;
break;
}
} else {
@ -498,7 +499,7 @@ bool LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *d
case 0:
SE_LOG_DEBUG(NULL, NULL, "%s: select timeout",
m_pid ? "parent" : "child");
return false;
return TIME_OUT;
break;
case 1: {
ssize_t sent = writev(fd, vec, 2);
@ -533,7 +534,7 @@ bool LocalTransportAgent::writeMessage(int fd, Message::Type type, const char *d
SE_LOG_DEBUG(NULL, NULL, "%s: sending %ld bytes done",
m_pid ? "parent" : "child",
(long)len);
return true;
return ACTIVE;
}
void LocalTransportAgent::cancel()
@ -548,7 +549,8 @@ TransportAgent::Status LocalTransportAgent::wait(bool noReply)
m_status = INACTIVE;
} else {
if (!m_receiveBuffer.haveMessage()) {
if (readMessage(m_messageFD, m_receiveBuffer, deadline())) {
m_status = readMessage(m_messageFD, m_receiveBuffer, deadline());
if (m_status == ACTIVE) {
// complete message received, check if it is SyncML
switch (m_receiveBuffer.m_message->m_type) {
case Message::MSG_SYNCML_XML:
@ -560,8 +562,6 @@ TransportAgent::Status LocalTransportAgent::wait(bool noReply)
SE_THROW("unsupported message type");
break;
}
} else {
m_status = TIME_OUT;
}
}
}
@ -569,7 +569,7 @@ TransportAgent::Status LocalTransportAgent::wait(bool noReply)
return m_status;
}
bool LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
TransportAgent::Status LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
{
while (!buffer.haveMessage()) {
int res = 0;
@ -580,7 +580,7 @@ bool LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
time_t now = time(NULL);
if (now >= deadline) {
// already too late
return false;
return TIME_OUT;
} else {
timeout.tv_sec = deadline - now;
}
@ -599,7 +599,8 @@ bool LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
res = 1;
break;
case GLIB_SELECT_QUIT:
SE_THROW("quit transport as requested as part of GLib event loop");
SE_LOG_DEBUG(NULL, NULL, "quit transport as requested as part of GLib event loop");
return FAILED;
break;
}
} else {
@ -614,7 +615,7 @@ bool LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
case 0:
SE_LOG_DEBUG(NULL, NULL, "%s: select timeout",
m_pid ? "parent" : "child");
return false;
return TIME_OUT;
break;
case 1: {
// data ready, ensure that buffer is available
@ -671,7 +672,7 @@ bool LocalTransportAgent::readMessage(int fd, Buffer &buffer, time_t deadline)
}
}
return true;
return ACTIVE;
}
void LocalTransportAgent::getReply(const char *&data, size_t &len, std::string &contentType)

View File

@ -165,9 +165,9 @@ class LocalTransportAgent : public TransportAgent
* Write Message with given type into file descriptor.
* Retries until error or all data written.
*
* @return true for success, false if deadline is reached, exception for fatal error
* @return ACTIVE for success, TIME_OUT or FAILED for failure, exception for really bad error
*/
bool writeMessage(int fd, Message::Type type, const char *data, size_t len, time_t deadline);
Status writeMessage(int fd, Message::Type type, const char *data, size_t len, time_t deadline);
/**
* Read bytes into buffer until complete Message
@ -175,9 +175,9 @@ class LocalTransportAgent : public TransportAgent
* end of that Message if available. An existing
* complete message is not overwritten.
*
* @return true for success, false if deadline is reached, exception for fatal error
* @return ACTIVE for success, TIME_OUT or FAILED for failure, exception for really bad error
*/
bool readMessage(int fd, Buffer &buffer, time_t deadline);
Status readMessage(int fd, Buffer &buffer, time_t deadline);
/** utility function for parent: copy child's report into m_clientReport */
void receiveChildReport();

View File

@ -3634,6 +3634,18 @@ SyncMLStatus SyncContext::doSync()
* message sending interval equals m_retryInterval.
*/
case TransportAgent::FAILED: {
// Send might have failed because of abort or
// suspend request.
if (checkForSuspend()) {
SE_LOG_DEBUG(NULL, NULL, "suspending after TransportAgent::FAILED as requested by user");
stepCmd = sysync::STEPCMD_SUSPEND;
break;
} else if (checkForAbort()) {
SE_LOG_DEBUG(NULL, NULL, "aborting after TransportAgent::FAILED as requested by user");
stepCmd = sysync::STEPCMD_ABORT;
break;
}
time_t curTime = time(NULL);
time_t duration = curTime - sendStart;
// same if() as above for TIME_OUT
@ -3646,18 +3658,6 @@ SyncMLStatus SyncContext::doSync()
(long)(duration % 60));
SE_THROW_EXCEPTION(TransportException, "transport failed, retry period exceeded");
} else {
// Send might have failed because of abort or
// suspend request.
if (checkForSuspend()) {
SE_LOG_DEBUG(NULL, NULL, "suspending after TransportAgent::FAILED as requested by user");
stepCmd = sysync::STEPCMD_SUSPEND;
break;
} else if (checkForAbort()) {
SE_LOG_DEBUG(NULL, NULL, "aborting after TransportAgent::FAILED as requested by user");
stepCmd = sysync::STEPCMD_ABORT;
break;
}
// retry send
int leftTime = m_retryInterval - (curTime - resendStart);
if (leftTime >0 ) {

View File

@ -565,7 +565,7 @@ class DBusUtil(Timeout):
updateProps[key] = tmpdict
return updateProps
def checkSync(self, expectedError=0):
def checkSync(self, expectedError=0, expectedResult=0):
# check recorded events in DBusUtil.events, first filter them
statuses = []
progresses = []
@ -620,6 +620,15 @@ class DBusUtil(Timeout):
self.failUnlessEqual(status, "done")
self.failUnlessEqual(error, expectedError)
# now check that report is sane
reports = self.session.GetReports(0, 100, utf8_strings=True)
self.failUnlessEqual(len(reports), 1)
if expectedResult:
self.failUnlessEqual(int(reports[0]["status"]), expectedResult)
else:
self.failUnlessEqual(int(reports[0]["status"]), 200)
self.failIf("error" in reports[0])
return reports[0]
class TestDBusServer(unittest.TestCase, DBusUtil):
"""Tests for the read-only Server API."""
@ -2482,7 +2491,8 @@ END:VCARD''')
self.session.Sync("slow", {})
loop.run()
self.failUnlessEqual(DBusUtil.quit_events, ["session " + self.sessionpath + " done"])
self.checkSync(20017) # transport error
report = self.checkSync(20017, 20043) # sources aborted, transport failure
self.failUnlessEqual(report["error"], "timeout, retry period exceeded")
@timeout(10)
@property("ENV", "SYNCEVOLUTION_LOCAL_CHILD_DELAY=5")
@ -2497,8 +2507,9 @@ END:VCARD''')
self.session.Abort()
loop.run()
self.failUnlessEqual(DBusUtil.quit_events, ["session " + self.sessionpath + " done"])
self.checkSync(20017) # aborted
report = self.checkSync(20017, 20017) # aborted
self.failIf("error" in report) # ... but without error message
self.failUnlessEqual(report["source-addressbook-status"], "0") # unknown status for source (aborted early)
def run(self, result):
self.runTest(result)