syncevo-dbus-server: implemented syncs via Connection API

At the moment, only running as SyncML client with some
HTTP SyncML server works. It can be triggered by sending
a special message with type "HTTP Config" and the configuration
name as content. dbus-server-http.py can be used to test
this.

The communication between engine and Connection is established
by the new DBusTransportAgent. Except for the limitation
to HTTP client sessions, the Connection API should now
be implemented completely.
This commit is contained in:
Patrick Ohly 2009-09-17 21:07:55 +02:00
parent 88f35083a5
commit c2114a7b85
2 changed files with 430 additions and 38 deletions

View File

@ -60,6 +60,7 @@ public:
class Session;
class Connection;
class Client;
class DBusTransportAgent;
/**
* Implements the read-only methods in a Session and the Server.
@ -212,6 +213,9 @@ public:
const Caller_t &ID,
const boost::shared_ptr<Watch> &watch);
/** detach this resource from all clients which own it */
void detach(Resource *resource);
/**
* Enqueue a session. Might also make it ready immediately,
* if nothing else is first in the queue. To be called
@ -295,6 +299,25 @@ public:
detach(resource.get());
}
/**
* Remove all references to the given resource, regardless whether
* it was referenced not at all or multiple times.
*/
void detachAll(Resource *resource) {
Resources_t::iterator it = m_resources.begin();
while (it != m_resources.end()) {
if (it->get() == resource) {
it = m_resources.erase(it);
} else {
++it;
}
}
}
void detachAll(boost::shared_ptr<Resource> resource)
{
detachAll(resource.get());
}
/**
* return corresponding smart pointer for a certain resource,
* empty pointer if not found
@ -370,6 +393,7 @@ class DBusSync : public SyncContext
public:
DBusSync(const std::string &config,
Session &session);
~DBusSync() {}
virtual boost::shared_ptr<TransportAgent> createTransportAgent();
virtual void displaySyncProgress(sysync::TProgressEventEnum type,
@ -394,6 +418,8 @@ class Session : public DBusObjectHelper,
{
DBusServer &m_server;
boost::weak_ptr<Connection> m_connection;
std::string m_connectionError;
bool m_useConnection;
/** temporary config changes */
FilterConfigNode::ConfigFilter m_syncFilter;
@ -438,10 +464,6 @@ class Session : public DBusObjectHelper,
void setConfig(bool update, bool clear, bool temporary,
const ReadOperations::Config_t &config);
typedef std::map<std::string, std::string> SourceModes_t;
void sync(const std::string &mode, const SourceModes_t &source_modes);
void abort();
void suspend();
void getStatus(std::string &status,
uint32_t &error,
SourceStatuses_t &sources);
@ -486,8 +508,25 @@ public:
void setPriority(int priority) { m_priority = priority; }
int getPriority() const { return m_priority; }
void setConnection(const boost::weak_ptr<Connection> c) { m_connection = c; }
void setConnection(const boost::shared_ptr<Connection> c) { m_connection = c; m_useConnection = c; }
boost::weak_ptr<Connection> getConnection() { return m_connection; }
bool useConnection() { return m_useConnection; }
/**
* After the connection closes, the Connection instance is
* destructed immediately. This is necessary so that the
* corresponding cleanup can remove all other classes
* only referenced by the Connection.
*
* This leads to the problem that an active sync cannot
* query the final error code of the connection. This
* is solved by setting a generic error code here when
* the sync starts and overwriting it when the connection
* closes.
*/
void setConnectionError(const std::string error) { m_connectionError = error; }
std::string getConnectionError() { return m_connectionError; }
DBusServer &getServer() { return m_server; }
@ -518,6 +557,11 @@ public:
void sourceProgress(sysync::TProgressEventEnum type,
SyncSource &source,
int32_t extra1, int32_t extra2, int32_t extra3);
typedef std::map<std::string, std::string> SourceModes_t;
void sync(const std::string &mode, const SourceModes_t &source_modes);
void abort();
void suspend();
};
@ -551,6 +595,19 @@ class Connection : public DBusObjectHelper, public Resource
const uint32_t m_sessionNum;
boost::shared_ptr<Session> m_session;
/**
* main loop that our DBusTransportAgent is currently waiting in,
* NULL if not waiting
*/
GMainLoop *m_loop;
/**
* buffer for received data, waiting here for engine to ask
* for it via DBusTransportAgent::getReply().
*/
SharedBuffer m_incomingMsg;
std::string m_incomingMsgType;
/**
* records the reason for the failure, sends Abort signal and puts
* the connection into the FAILED state.
@ -575,6 +632,8 @@ class Connection : public DBusObjectHelper, public Resource
bool,
uint32_t> reply;
friend class DBusTransportAgent;
public:
const std::string m_description;
@ -588,7 +647,57 @@ public:
void activate();
/** session requested by us is ready to run a sync */
void ready();
/** connection is no longer needed, ensure that it gets deleted */
void shutdown();
};
/**
* A proxy for a Connection instance. The Connection instance can go
* away (weak pointer, must be locked and and checked each time it is
* needed). The agent must remain available as long as the engine
* needs and basically becomes unusuable once the connection dies.
*
* Reconnecting is not currently supported.
*/
class DBusTransportAgent : public TransportAgent
{
GMainLoop *m_loop;
Session &m_session;
boost::weak_ptr<Connection> m_connection;
std::string m_url;
std::string m_type;
TransportCallback m_callback;
void *m_callbackData;
int m_callbackInterval;
SharedBuffer m_incomingMsg;
std::string m_incomingMsgType;
void doWait(boost::shared_ptr<Connection> &connection);
public:
DBusTransportAgent(GMainLoop *loop,
Session &session,
boost::weak_ptr<Connection> connection);
~DBusTransportAgent();
virtual void setURL(const std::string &url) { m_url = url; }
virtual void setContentType(const std::string &type) { m_type = type; }
virtual void send(const char *data, size_t len);
virtual void cancel() {}
virtual void shutdown();
virtual Status wait(bool noReply = false);
virtual void setCallback (TransportCallback cb, void * udata, int interval)
{
m_callback = cb;
m_callbackData = udata;
m_callbackInterval = interval;
}
virtual void getReply(const char *&data, size_t &len, std::string &contentType);
};
/***************** ReadOperations implementation ****************/
@ -615,20 +724,26 @@ void ReadOperations::getReports(uint32_t start, uint32_t count,
DBusSync::DBusSync(const std::string &config,
Session &session) :
SyncContext(config),
SyncContext(config, true),
m_session(session)
{
}
boost::shared_ptr<TransportAgent> DBusSync::createTransportAgent()
{
// TODO: check if we have a connection set in the session
// and if so, use it
// no connection, use HTTP via libsoup/GMainLoop
GMainLoop *loop = m_session.getServer().getLoop();
g_main_loop_ref(loop);
return boost::shared_ptr<TransportAgent>(new SoupTransportAgent(loop));
if (m_session.useConnection()) {
// use the D-Bus Connection to send and receive messages
return boost::shared_ptr<TransportAgent>(new DBusTransportAgent(m_session.getServer().getLoop(),
m_session,
m_session.getConnection()));
} else {
// no connection, use HTTP via libsoup/GMainLoop
GMainLoop *loop = m_session.getServer().getLoop();
g_main_loop_ref(loop);
boost::shared_ptr<HTTPTransportAgent> agent(new SoupTransportAgent(loop));
agent->setConfig(*this);
return agent;
}
}
void DBusSync::displaySyncProgress(sysync::TProgressEventEnum type,
@ -793,6 +908,7 @@ Session::Session(DBusServer &server,
"org.syncevolution.Session"),
ReadOperations(config_name),
m_server(server),
m_useConnection(false),
m_active(false),
m_done(false),
m_abort(false),
@ -908,6 +1024,12 @@ void Session::run()
}
m_done = true;
fireStatus(true);
// if there is a connection, then it is no longer needed
boost::shared_ptr<Connection> c = m_connection.lock();
if (c) {
c->shutdown();
}
}
}
@ -981,23 +1103,44 @@ void Connection::process(const Caller_t &caller,
switch (m_state) {
case SETUP: {
// TODO: check message type, determine whether we act
// as client or server, choose config, create Session, ...
std::string config;
// check message type, determine whether we act
// as client or server, choose config
if (message_type == "HTTP Config") {
// type used for testing, payload is config name
config.assign(reinterpret_cast<const char *>(message.second),
message.first);
} else {
throw runtime_error("message type not supported for starting a sync");
}
// For the time being, request a session, then when it
// is ready, send a dummy reply.
// run session as client (server not supported yet)
m_state = PROCESSING;
m_session.reset(new Session(m_server,
"default",
config,
m_sessionNum));
m_session->setPriority(Session::PRI_CONNECTION);
m_session->setConnection(myself);
// this will be reset only when the connection shuts down okay
// or overwritten with the error given to us in
// Connection::close()
m_session->setConnectionError("closed prematurely");
m_server.enqueue(m_session);
break;
}
case PROCESSING:
throw std::runtime_error("protocol error: already processing a message");
break;
case WAITING:
throw std::runtime_error("not implemented yet");
// TODO: pass message to session
m_incomingMsg = SharedBuffer(reinterpret_cast<const char *>(message.second),
message.first);
m_incomingMsgType = message_type;
m_state = PROCESSING;
// get out of DBusTransportAgent::wait()
if (m_loop) {
g_main_loop_quit(m_loop);
m_loop = NULL;
}
break;
case FINAL:
case DONE:
@ -1029,11 +1172,14 @@ void Connection::close(const Caller_t &caller,
if (!normal ||
m_state != FINAL) {
failed(error.empty() ?
"connection closed unexpectedly" :
error);
std::string err = error.empty() ?
"connection closed unexpectedly" :
error;
m_session->setConnectionError(err);
failed(err);
} else {
m_state = DONE;
m_session->setConnectionError("");
}
// remove reference to us from client, will destruct *this*
@ -1041,6 +1187,13 @@ void Connection::close(const Caller_t &caller,
client->detach(this);
}
void Connection::shutdown()
{
// trigger removal of this connection by removing all
// references to it
m_session->getServer().detach(this);
}
Connection::Connection(DBusServer &server,
const DBusConnectionPtr &conn,
uint32_t session_num,
@ -1054,6 +1207,7 @@ Connection::Connection(DBusServer &server,
m_mustAuthenticate(must_authenticate),
m_state(SETUP),
m_sessionNum(session_num),
m_loop(NULL),
abort(*this, "Abort"),
reply(*this, "Reply"),
m_description(buildDescription(peer))
@ -1066,11 +1220,22 @@ Connection::~Connection()
m_state == DONE ? ", normal shutdown" : " unexpectedly",
m_failure.empty() ? "" : ": ",
m_failure.c_str());
if (m_state != DONE) {
abort();
try {
if (m_state != DONE) {
abort();
}
// DBusTransportAgent waiting? Wake it up.
if (m_loop) {
g_main_loop_quit(m_loop);
m_loop = NULL;
}
m_session.use_count();
m_session.reset();
} catch (...) {
// log errors, but do not propagate them because we are
// destructing
Exception::handle();
}
m_session.use_count();
m_session.reset();
}
void Connection::activate()
@ -1105,20 +1270,151 @@ void Connection::activate()
void Connection::ready()
{
// TODO: proceed with sync now that our session is ready
// proceed with sync now that our session is ready
m_session->sync("", Session::SourceModes_t());
}
// dummy reply
m_state = WAITING;
const char msg[] = "hello world";
try {
reply(std::make_pair(sizeof(msg) - 1, (const uint8_t *)msg),
"dummy_type", std::map<std::string, std::string>(), true, m_sessionNum);
} catch (...) {
failed("sending reply failed");
throw;
/****************** DBusTransportAgent implementation **************/
DBusTransportAgent::DBusTransportAgent(GMainLoop *loop,
Session &session,
boost::weak_ptr<Connection> connection) :
m_loop(loop),
m_session(session),
m_connection(connection)
{
}
DBusTransportAgent::~DBusTransportAgent()
{
boost::shared_ptr<Connection> connection = m_connection.lock();
if (connection) {
connection->shutdown();
}
}
void DBusTransportAgent::send(const char *data, size_t len)
{
boost::shared_ptr<Connection> connection = m_connection.lock();
if (!connection) {
SE_THROW_EXCEPTION(TransportException,
"D-Bus peer has disconnected");
}
if (connection->m_state != Connection::PROCESSING) {
SE_THROW_EXCEPTION(TransportException,
"cannot send to our D-Bus peer");
}
// Change state in advance. If we fail while replying, then all
// further resends will fail with the error above.
connection->m_state = Connection::WAITING;
connection->m_incomingMsg = SharedBuffer();
// TODO: turn D-Bus exceptions into transport exceptions
std::map<std::string, std::string> meta;
meta["URL"] = m_url;
connection->reply(std::make_pair(len, reinterpret_cast<const uint8_t *>(data)),
m_type, meta, false, connection->m_sessionNum);
}
void DBusTransportAgent::shutdown()
{
boost::shared_ptr<Connection> connection = m_connection.lock();
if (!connection) {
SE_THROW_EXCEPTION(TransportException,
"D-Bus peer has disconnected");
}
// send final, empty message and wait for close
connection->m_state = Connection::FINAL;
connection->reply(std::pair<size_t, const uint8_t *>(0, 0),
"", std::map<std::string, std::string>(),
true, connection->m_sessionNum);
}
void DBusTransportAgent::doWait(boost::shared_ptr<Connection> &connection)
{
// let Connection wake us up when it has a reply or
// when it closes down
connection->m_loop = m_loop;
// release our reference so that the Connection instance can
// be destructed when requested by the D-Bus peer
connection.reset();
// TODO: setup regular callback
// now wait
g_main_loop_run(m_loop);
}
DBusTransportAgent::Status DBusTransportAgent::wait(bool noReply)
{
boost::shared_ptr<Connection> connection = m_connection.lock();
if (!connection) {
SE_THROW_EXCEPTION(TransportException,
"D-Bus peer has disconnected");
}
switch (connection->m_state) {
case Connection::PROCESSING:
m_incomingMsg = connection->m_incomingMsg;
m_incomingMsgType = connection->m_incomingMsgType;
return GOT_REPLY;
break;
case Connection::FINAL:
doWait(connection);
// if the connection is still available, then keep waiting
connection = m_connection.lock();
if (connection) {
return ACTIVE;
} else if (m_session.getConnectionError().empty()) {
return INACTIVE;
} else {
SE_THROW_EXCEPTION(TransportException, m_session.getConnectionError());
return FAILED;
}
break;
case Connection::WAITING:
if (noReply) {
// message is sent as far as we know, so return
return INACTIVE;
}
doWait(connection);
// tell caller to check again
return ACTIVE;
break;
case Connection::DONE:
if (!noReply) {
SE_THROW_EXCEPTION(TransportException,
"internal error: transport has shut down, can no longer receive reply");
}
return CLOSED;
default:
SE_THROW_EXCEPTION(TransportException,
"internal error: send() on connection which is not ready");
break;
}
return FAILED;
}
void DBusTransportAgent::getReply(const char *&data, size_t &len, std::string &contentType)
{
data = m_incomingMsg.get();
len = m_incomingMsg.size();
contentType = m_incomingMsgType;
}
/********************** DBusServer implementation ******************/
void DBusServer::clientGone(Client *c)
@ -1322,6 +1618,15 @@ boost::shared_ptr<Client> DBusServer::addClient(const DBusConnectionPtr &conn,
return client;
}
void DBusServer::detach(Resource *resource)
{
BOOST_FOREACH(const Clients_t::value_type &client_entry,
m_clients) {
client_entry.second->detachAll(resource);
}
}
void DBusServer::enqueue(const boost::shared_ptr<Session> &session)
{
WorkQueue_t::iterator it = m_workQueue.end();

87
test/dbus-server-http.py Executable file
View File

@ -0,0 +1,87 @@
#! /usr/bin/python
'''Usage: dbus-server-http.py <server name>
Runs a sync session with an HTTP SyncML server
configured in <server name> config.'''
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gobject
import sys
import httplib, urlparse
DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
object = dbus.Interface(bus.get_object('org.syncevolution',
'/org/syncevolution/Server'),
'org.syncevolution.Server')
conpath = object.Connect({'description': 'dbus-server-connection.py',
'transport': 'HTTP'},
False,
0)
connection = dbus.Interface(bus.get_object('org.syncevolution',
conpath),
'org.syncevolution.Connection')
loop = gobject.MainLoop()
def Abort():
print "connection went down"
def Reply(data, type, meta, final, session):
try:
if final:
print "closing connection"
connection.Close(True, "")
else:
print ("send %d bytes of type %s, " % (len(data), type)), meta
url = urlparse.urlparse(meta["URL"])
if url.scheme == "http":
conn = httplib.HTTPConnection(url.netloc)
elif url.scheme == "https":
conn = httplib.HTTPSConnection(url.netloc)
else:
raise "invalid scheme " + url.scheme
conn.request("POST",
url.path,
data,
{"Content-type": type})
resp = conn.getresponse()
reply = resp.read()
print "received %d bytes of type %s" % (len(data), type)
replytype = resp.getheader("Content-type")
connection.Process(reply, replytype)
except Exception as ex:
print ex
loop.quit()
def SessionChanged(object, ready):
print "SessionChanged:", object, ready
if not ready:
loop.quit()
bus.add_signal_receiver(Abort,
'Abort',
'org.syncevolution.Connection',
'org.syncevolution',
conpath,
byte_arrays=True)
bus.add_signal_receiver(Reply,
'Reply',
'org.syncevolution.Connection',
'org.syncevolution',
conpath,
byte_arrays=True)
bus.add_signal_receiver(SessionChanged,
'SessionChanged',
'org.syncevolution.Server',
'org.syncevolution',
None,
byte_arrays=True)
# start a test session with an HTTP server
connection.Process(sys.argv[1], "HTTP Config")
loop.run()