diff --git a/MANIFEST.in b/MANIFEST.in index a8e57f1..156be24 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include README.md LICENSE global-include CMakeLists.txt *.cmake -recursive-include pylokimq * -recursive-include pybind11/include *.h \ No newline at end of file +recursive-include pyoxenmq * +recursive-include pybind11/include *.h diff --git a/README.md b/README.md index d73ee0c..ccc5a92 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# pylokimq +# pyoxenmq pybind layer for oxenmq @@ -14,8 +14,8 @@ dependencies check the source out (recursive repo): - $ git clone --recursive https://github.com/oxen-io/loki-pylokimq - $ cd loki-pylokimq + $ git clone --recursive https://github.com/oxen-io/oxen-pyoxenmq + $ cd oxen-pyoxenmq build: diff --git a/external/oxen-mq b/external/oxen-mq index faeeaa8..dccbd1e 160000 --- a/external/oxen-mq +++ b/external/oxen-mq @@ -1 +1 @@ -Subproject commit faeeaa86d4ef859e9d93548040f8b724c504e356 +Subproject commit dccbd1e8cdb9f57206077facdf973c8e86fc6bec diff --git a/pyoxenmq/lokimq.cpp b/pyoxenmq/lokimq.cpp deleted file mode 100644 index 2b2e90e..0000000 --- a/pyoxenmq/lokimq.cpp +++ /dev/null @@ -1,255 +0,0 @@ -#include "common.hpp" -#include -#include -#include -#include -#include -#include -#include -#include - -namespace oxenmq -{ - template - std::future> MQ_start_request( - OxenMQ& omq, - ConnectionID conn, - std::string name, - std::vector byte_args, - Options&&... opts) - { - std::vector args; - args.reserve(byte_args.size()); - for (auto& b : byte_args) - args.push_back(b); - - auto result = std::make_shared>>(); - auto fut = result->get_future(); - omq.request(conn, std::move(name), - [result=std::move(result)](bool success, std::vector value) - { - if (success) - result->set_value(std::move(value)); - else - { - std::string err; - for (auto& m : value) { - if (!err.empty()) err += ", "; - err += m; - } - result->set_exception(std::make_exception_ptr(std::runtime_error{"Request failed: " + err})); - } - }, - lokimq::send_option::data_parts(args.begin(), args.end()), - std::forward(opts)... - ); - return fut; - } - - // Binds a stl future. `Conv` is a lambda that converts the future's .get() value into something - // Python-y (it can be the value directly, if the value is convertible to Python already). - template - void bind_future(py::module& m, std::string class_name, Conv conv) - { - py::class_(m, class_name.c_str()) - .def("get", [conv=std::move(conv)](F& f) { return conv(f.get()); }, - "Gets the result (or raises an exception if the result set an exception); must only be called once") - .def("valid", [](F& f) { return f.valid(); }, - "Returns true if the result is available") - .def("wait", &F::wait, - "Waits indefinitely for the result to become available") - .def("wait_for", &F::template wait_for>, - "Waits up to the given timedelta for the result to become available") - .def("wait_for", [](F& f, double seconds) { return f.wait_for(std::chrono::duration{seconds}); }, - "Waits up to the given number of seconds for the result to become available") - .def("wait_until", &F::template wait_until, - "Wait until the given datetime for the result to become available") - ; - } - - static std::mutex log_mutex; - - void - OxenMQ_Init(py::module & mod) - { - using namespace pybind11::literals; - py::class_(mod, "ConnectionID") - .def("__eq__", [](const ConnectionID & self, const ConnectionID & other) { - return self == other; - }); - py::class_(mod, "Message") - .def_readonly("remote", &Message::remote) - .def_readonly("conn", &Message::conn); - - py::class_
(mod, "Address") - .def(py::init()); - py::class_(mod, "TaggedThreadID"); - py::enum_(mod, "LogLevel") - .value("fatal", LogLevel::fatal).value("error", LogLevel::error).value("warn", LogLevel::warn) - .value("info", LogLevel::info).value("debug", LogLevel::debug).value("trace", LogLevel::trace); - - py::enum_(mod, "future_status") - .value("deferred", std::future_status::deferred) - .value("ready", std::future_status::ready) - .value("timeout", std::future_status::timeout); - bind_future>>(mod, "ResultFuture", - [](std::vector bytes) { - py::list l; - for (const auto& v : bytes) - l.append(py::bytes(v)); - return l; - }); - - py::class_(mod, "OxenMQ") - .def(py::init<>()) - .def(py::init([](LogLevel level) { - // Quick and dirty logger that logs to stderr. It would be much nicer to take a python - // function, but that deadlocks pretty much right away because of the crappiness of the gil. - return std::make_unique([] (LogLevel lvl, const char* file, int line, std::string msg) mutable { - std::lock_guard l{log_mutex}; - std::cerr << '[' << lvl << "][" << file << ':' << line << "]: " << msg << "\n"; - }, level); - })) - .def_readwrite("handshake_time", &OxenMQ::HANDSHAKE_TIME) - .def_readwrite("pubkey_base_routing_id", &OxenMQ::PUBKEY_BASED_ROUTING_ID) - .def_readwrite("max_message_size", &OxenMQ::MAX_MSG_SIZE) - .def_readwrite("max_sockets", &OxenMQ::MAX_SOCKETS) - .def_readwrite("reconnect_interval", &OxenMQ::RECONNECT_INTERVAL) - .def_readwrite("close_longer", &OxenMQ::CLOSE_LINGER) - .def_readwrite("connection_check_interval", &OxenMQ::CONN_CHECK_INTERVAL) - .def_readwrite("connection_heartbeat", &OxenMQ::CONN_HEARTBEAT) - .def_readwrite("connection_heartbeat_timeout", &OxenMQ::CONN_HEARTBEAT_TIMEOUT) - .def_readwrite("startup_umask", &OxenMQ::STARTUP_UMASK) - .def("start", &OxenMQ::start) - .def("listen_plain", - [](OxenMQ & self, std::string path) { - self.listen_plain(path); - }) - .def("listen_curve", - [](OxenMQ & self, std::string path) { - self.listen_curve(path); - }) - .def("add_tagged_thread", - [](OxenMQ & self, std::string name) { - return self.add_tagged_thread(name); - }) - .def("add_timer", - [](OxenMQ & self, std::chrono::milliseconds interval, std::function callback) { - self.add_timer(callback, interval); - }) - .def("call_soon", - [](OxenMQ & self, std::function job, std::optional thread) - { - self.job(std::move(job), std::move(thread)); - }) - .def("add_anonymous_category", - [](OxenMQ & self, std::string name) - { - self.add_category(std::move(name), AuthLevel::none); - }) - .def("add_request_command", - [](OxenMQ &self, - std::string category, - std::string name, - py::function handler) - { - self.add_request_command(category, name, - [handler](Message & msg) { - std::string result; - { - py::gil_scoped_acquire gil; - - std::vector data; - for (auto& arg : msg.data) - { - data.emplace_back(arg.begin(), arg.size()); - } - - try - { - const auto obj = handler(data); - result = py::str(obj); - } - catch(std::exception & ex) - { - PyErr_SetString(PyExc_RuntimeError, ex.what()); - } - } - msg.send_reply(result); - }); - }) - .def("add_request_command_ex", - [](OxenMQ &self, - std::string category, - std::string name, - py::function handler) - { - self.add_request_command(category, name, - [handler](Message & msg) { - std::string result; - { - py::gil_scoped_acquire gil; - - std::vector data; - for (auto& arg : msg.data) - { - data.emplace_back(arg.begin(), arg.size()); - } - - try - { - const auto obj = handler(data, msg); - result = py::str(obj); - } - catch(std::exception & ex) - { - PyErr_SetString(PyExc_RuntimeError, ex.what()); - } - } - msg.send_reply(result); - }); - }) - .def("connect_remote", - [](OxenMQ & self, - std::string remote) -> ConnectionID - { - std::promise promise; - self.connect_remote( - remote, - [&promise](ConnectionID id) { promise.set_value(std::move(id)); }, - [&promise](auto, std::string_view reason) { - promise.set_exception(std::make_exception_ptr( - std::runtime_error{"Connection failed: " + std::string{reason}})); - }); - return promise.get_future().get(); - }) - .def("request", - [](OxenMQ & self, - ConnectionID conn, - std::string name, - std::vector args, - std::optional timeout) -> py::list - { - py::list l; - for (auto& s : LokiMQ_start_request(self, conn, std::move(name), std::move(args), - lokimq::send_option::request_timeout{timeout ? std::chrono::milliseconds(long(*timeout * 1000)) : DEFAULT_REQUEST_TIMEOUT} - ).get()) - l.append(py::bytes(s)); - return l; - }, - "conn"_a, "name"_a, "args"_a = std::vector{}, "timeout"_a = py::none{}) - .def("request_future", - [](OxenMQ & self, - ConnectionID conn, - std::string name, - std::vector args, - std::optional timeout) -> std::future> - { - return LokiMQ_start_request(self, conn, std::move(name), std::move(args), - lokimq::send_option::request_timeout{timeout ? std::chrono::milliseconds(long(*timeout * 1000)) : DEFAULT_REQUEST_TIMEOUT} - ); - }, - "conn"_a, "name"_a, "args"_a = std::vector{}, "timeout"_a = py::none{}) - ; - } -} diff --git a/pyoxenmq/oxenmq.cpp b/pyoxenmq/oxenmq.cpp index d70d1e4..89b1e6a 100644 --- a/pyoxenmq/oxenmq.cpp +++ b/pyoxenmq/oxenmq.cpp @@ -111,7 +111,7 @@ namespace oxenmq }, level); })) .def_readwrite("handshake_time", &OxenMQ::HANDSHAKE_TIME) - .def_readwrite("pubkey_base_routing_id", &OxenMQ::PUBKEY_BASED_ROUTING_ID) + .def_readwrite("ephemeral_routing_id", &OxenMQ::EPHEMERAL_ROUTING_ID) .def_readwrite("max_message_size", &OxenMQ::MAX_MSG_SIZE) .def_readwrite("max_sockets", &OxenMQ::MAX_SOCKETS) .def_readwrite("reconnect_interval", &OxenMQ::RECONNECT_INTERVAL) @@ -178,37 +178,6 @@ namespace oxenmq msg.send_reply(result); }); }) - .def("add_request_command_ex", - [](OxenMQ &self, - std::string category, - std::string name, - py::function handler) - { - self.add_request_command(category, name, - [handler](Message & msg) { - std::string result; - { - py::gil_scoped_acquire gil; - - std::vector data; - for (auto& arg : msg.data) - { - data.emplace_back(arg.begin(), arg.size()); - } - - try - { - const auto obj = handler(data, msg); - result = py::str(obj); - } - catch(std::exception & ex) - { - PyErr_SetString(PyExc_RuntimeError, ex.what()); - } - } - msg.send_reply(result); - }); - }) .def("connect_remote", [](OxenMQ & self, std::string remote) -> ConnectionID