From a98722a7dafb29694f090f1cd6c912b017611fe1 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 21 Oct 2021 21:03:19 -0300 Subject: [PATCH 01/11] pyoxenmq overhaul - Wrap much more of the OxenMQ - Add extensive pydoc to everything - Switch setup.py to pybind's built-in tools --- CMakeLists.txt | 42 +- MANIFEST.in | 2 +- pyoxenmq/CMakeLists.txt | 10 - pyoxenmq/bencode.cpp | 16 - pyoxenmq/module.cpp | 7 - pyoxenmq/oxenmq.cpp | 224 -------- pyproject.toml | 8 + setup.py | 89 +--- src/bencode.cpp | 15 + {pyoxenmq => src}/common.hpp | 10 +- src/module.cpp | 6 + src/oxenmq.cpp | 953 +++++++++++++++++++++++++++++++++++ 12 files changed, 1035 insertions(+), 347 deletions(-) delete mode 100644 pyoxenmq/CMakeLists.txt delete mode 100644 pyoxenmq/bencode.cpp delete mode 100644 pyoxenmq/module.cpp delete mode 100644 pyoxenmq/oxenmq.cpp create mode 100644 pyproject.toml create mode 100644 src/bencode.cpp rename {pyoxenmq => src}/common.hpp (57%) create mode 100644 src/module.cpp create mode 100644 src/oxenmq.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b557af..3d3cf65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,30 +1,34 @@ cmake_minimum_required(VERSION 3.10) # bionic's cmake version -# Has to be set before `project()`, and ignored on non-macos: -set(CMAKE_OSX_DEPLOYMENT_TARGET 10.13 CACHE STRING "macOS deployment target (Apple clang only)") find_program(CCACHE_PROGRAM ccache) if(CCACHE_PROGRAM) set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}") endif() -set(PROJECT_NAME pyoxenmq) -project(${PROJECT_NAME} - VERSION 0.1.0 - DESCRIPTION "pybind layer for oxenmq" +project(pyoxenmq + VERSION 1.0.0 + DESCRIPTION "python interface to OxenMQ" LANGUAGES CXX) +add_subdirectory(pybind11) -if(NOT CMAKE_BUILD_TYPE) - set(CMAKE_BUILD_TYPE RelWithDebInfo) +pybind11_add_module(pyoxenmq MODULE + bencode.cpp + oxenmq.cpp + module.cpp + ) + +target_compile_features(pyoxenmq PRIVATE cxx_std_17) + +if(TARGET oxenmq::oxenmq) + target_link_libraries(pyoxenmq PRIVATE oxenmq::oxenmq) +else() + include(FindPkgConfig) + pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET liboxenmq>=1.2.8) + target_link_libraries(pyoxenmq PUBLIC PkgConfig::oxenmq) endif() -include(CheckCXXSourceCompiles) -include(CheckLibraryExists) -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS OFF) - option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON) if(SUBMODULE_CHECK) find_package(Git) @@ -45,11 +49,15 @@ if(SUBMODULE_CHECK) endif() endif() -add_compile_options(-Wno-deprecated-declarations) - add_subdirectory(external/pybind11) include(FindPkgConfig) pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET GLOBAL liboxenmq) -add_subdirectory(pyoxenmq) +pybind11_add_module(oxenmq MODULE + src/bencode.cpp + src/bencode.cppoxenmq.cpp + src/module.cpp +) + +target_link_libraries(oxenmq PUBLIC PkgConfig::oxenmq) diff --git a/MANIFEST.in b/MANIFEST.in index 156be24..99c01b3 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ include README.md LICENSE global-include CMakeLists.txt *.cmake -recursive-include pyoxenmq * +recursive-include src * recursive-include pybind11/include *.h diff --git a/pyoxenmq/CMakeLists.txt b/pyoxenmq/CMakeLists.txt deleted file mode 100644 index efd2e32..0000000 --- a/pyoxenmq/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ - -pybind11_add_module(pyoxenmq MODULE - bencode.cpp - oxenmq.cpp - module.cpp -) - -target_link_libraries(pyoxenmq PUBLIC PkgConfig::oxenmq) - - diff --git a/pyoxenmq/bencode.cpp b/pyoxenmq/bencode.cpp deleted file mode 100644 index c63d9f7..0000000 --- a/pyoxenmq/bencode.cpp +++ /dev/null @@ -1,16 +0,0 @@ -#include "common.hpp" -#include "oxenmq/base32z.h" - -namespace oxenmq -{ - void - BEncode_Init(py::module & mod) - { - mod.def("base32z_encode", [](py::bytes data) { - char * ptr = nullptr; - py::ssize_t sz = 0; - PyBytes_AsStringAndSize(data.ptr(), &ptr, &sz); - return oxenmq::to_base32z(ptr, ptr+sz); - }); - } -} diff --git a/pyoxenmq/module.cpp b/pyoxenmq/module.cpp deleted file mode 100644 index 24842f6..0000000 --- a/pyoxenmq/module.cpp +++ /dev/null @@ -1,7 +0,0 @@ -#include "common.hpp" - -PYBIND11_MODULE(pyoxenmq, m) -{ - oxenmq::OxenMQ_Init(m); - oxenmq::BEncode_Init(m); -} diff --git a/pyoxenmq/oxenmq.cpp b/pyoxenmq/oxenmq.cpp deleted file mode 100644 index 89b1e6a..0000000 --- a/pyoxenmq/oxenmq.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include "common.hpp" -#include -#include -#include -#include -#include -#include -#include -#include - -namespace oxenmq -{ - template - std::future> MQ_start_request( - OxenMQ& omq, - ConnectionID conn, - std::string name, - std::vector byte_args, - Options&&... opts) - { - std::vector args; - args.reserve(byte_args.size()); - for (auto& b : byte_args) - args.push_back(b); - - auto result = std::make_shared>>(); - auto fut = result->get_future(); - omq.request(conn, std::move(name), - [result=std::move(result)](bool success, std::vector value) - { - if (success) - result->set_value(std::move(value)); - else - { - std::string err; - for (auto& m : value) { - if (!err.empty()) err += ", "; - err += m; - } - result->set_exception(std::make_exception_ptr(std::runtime_error{"Request failed: " + err})); - } - }, - oxenmq::send_option::data_parts(args.begin(), args.end()), - std::forward(opts)... - ); - return fut; - } - - // Binds a stl future. `Conv` is a lambda that converts the future's .get() value into something - // Python-y (it can be the value directly, if the value is convertible to Python already). - template - void bind_future(py::module& m, std::string class_name, Conv conv) - { - py::class_(m, class_name.c_str()) - .def("get", [conv=std::move(conv)](F& f) { return conv(f.get()); }, - "Gets the result (or raises an exception if the result set an exception); must only be called once") - .def("valid", [](F& f) { return f.valid(); }, - "Returns true if the result is available") - .def("wait", &F::wait, - "Waits indefinitely for the result to become available") - .def("wait_for", &F::template wait_for>, - "Waits up to the given timedelta for the result to become available") - .def("wait_for", [](F& f, double seconds) { return f.wait_for(std::chrono::duration{seconds}); }, - "Waits up to the given number of seconds for the result to become available") - .def("wait_until", &F::template wait_until, - "Wait until the given datetime for the result to become available") - ; - } - - static std::mutex log_mutex; - - void - OxenMQ_Init(py::module & mod) - { - using namespace pybind11::literals; - py::class_(mod, "ConnectionID") - .def("__eq__", [](const ConnectionID & self, const ConnectionID & other) { - return self == other; - }); - py::class_(mod, "Message") - .def_readonly("remote", &Message::remote) - .def_readonly("conn", &Message::conn); - - py::class_
(mod, "Address") - .def(py::init()); - py::class_(mod, "TaggedThreadID"); - py::enum_(mod, "LogLevel") - .value("fatal", LogLevel::fatal).value("error", LogLevel::error).value("warn", LogLevel::warn) - .value("info", LogLevel::info).value("debug", LogLevel::debug).value("trace", LogLevel::trace); - - py::enum_(mod, "future_status") - .value("deferred", std::future_status::deferred) - .value("ready", std::future_status::ready) - .value("timeout", std::future_status::timeout); - bind_future>>(mod, "ResultFuture", - [](std::vector bytes) { - py::list l; - for (const auto& v : bytes) - l.append(py::bytes(v)); - return l; - }); - - py::class_(mod, "OxenMQ") - .def(py::init<>()) - .def(py::init([](LogLevel level) { - // Quick and dirty logger that logs to stderr. It would be much nicer to take a python - // function, but that deadlocks pretty much right away because of the crappiness of the gil. - return std::make_unique([] (LogLevel lvl, const char* file, int line, std::string msg) mutable { - std::lock_guard l{log_mutex}; - std::cerr << '[' << lvl << "][" << file << ':' << line << "]: " << msg << "\n"; - }, level); - })) - .def_readwrite("handshake_time", &OxenMQ::HANDSHAKE_TIME) - .def_readwrite("ephemeral_routing_id", &OxenMQ::EPHEMERAL_ROUTING_ID) - .def_readwrite("max_message_size", &OxenMQ::MAX_MSG_SIZE) - .def_readwrite("max_sockets", &OxenMQ::MAX_SOCKETS) - .def_readwrite("reconnect_interval", &OxenMQ::RECONNECT_INTERVAL) - .def_readwrite("close_longer", &OxenMQ::CLOSE_LINGER) - .def_readwrite("connection_check_interval", &OxenMQ::CONN_CHECK_INTERVAL) - .def_readwrite("connection_heartbeat", &OxenMQ::CONN_HEARTBEAT) - .def_readwrite("connection_heartbeat_timeout", &OxenMQ::CONN_HEARTBEAT_TIMEOUT) - .def_readwrite("startup_umask", &OxenMQ::STARTUP_UMASK) - .def("start", &OxenMQ::start) - .def("listen_plain", - [](OxenMQ & self, std::string path) { - self.listen_plain(path); - }) - .def("listen_curve", - [](OxenMQ & self, std::string path) { - self.listen_curve(path); - }) - .def("add_tagged_thread", - [](OxenMQ & self, std::string name) { - return self.add_tagged_thread(name); - }) - .def("add_timer", - [](OxenMQ & self, std::chrono::milliseconds interval, std::function callback) { - self.add_timer(callback, interval); - }) - .def("call_soon", - [](OxenMQ & self, std::function job, std::optional thread) - { - self.job(std::move(job), std::move(thread)); - }) - .def("add_anonymous_category", - [](OxenMQ & self, std::string name) - { - self.add_category(std::move(name), AuthLevel::none); - }) - .def("add_request_command", - [](OxenMQ &self, - std::string category, - std::string name, - py::function handler) - { - self.add_request_command(category, name, - [handler](Message & msg) { - std::string result; - { - py::gil_scoped_acquire gil; - - std::vector data; - for (auto& arg : msg.data) - { - data.emplace_back(arg.begin(), arg.size()); - } - - try - { - const auto obj = handler(data); - result = py::str(obj); - } - catch(std::exception & ex) - { - PyErr_SetString(PyExc_RuntimeError, ex.what()); - } - } - msg.send_reply(result); - }); - }) - .def("connect_remote", - [](OxenMQ & self, - std::string remote) -> ConnectionID - { - std::promise promise; - self.connect_remote( - remote, - [&promise](ConnectionID id) { promise.set_value(std::move(id)); }, - [&promise](auto, std::string_view reason) { - promise.set_exception(std::make_exception_ptr( - std::runtime_error{"Connection failed: " + std::string{reason}})); - }); - return promise.get_future().get(); - }) - .def("request", - [](OxenMQ & self, - ConnectionID conn, - std::string name, - std::vector args, - std::optional timeout) -> py::list - { - py::list l; - for (auto& s : MQ_start_request(self, conn, std::move(name), std::move(args), - oxenmq::send_option::request_timeout{timeout ? std::chrono::milliseconds(long(*timeout * 1000)) : DEFAULT_REQUEST_TIMEOUT} - ).get()) - l.append(py::bytes(s)); - return l; - }, - "conn"_a, "name"_a, "args"_a = std::vector{}, "timeout"_a = py::none{}) - .def("request_future", - [](OxenMQ & self, - ConnectionID conn, - std::string name, - std::vector args, - std::optional timeout) -> std::future> - { - return MQ_start_request(self, conn, std::move(name), std::move(args), - oxenmq::send_option::request_timeout{timeout ? std::chrono::milliseconds(long(*timeout * 1000)) : DEFAULT_REQUEST_TIMEOUT} - ); - }, - "conn"_a, "name"_a, "args"_a = std::vector{}, "timeout"_a = py::none{}) - ; - } -} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ab05f30 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,8 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel", + "pybind11>=2.4.0", +] + +build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index ef334fb..c6373f1 100644 --- a/setup.py +++ b/setup.py @@ -1,74 +1,31 @@ -import os -import re -import sys -import platform -import subprocess +from setuptools import setup -from setuptools import setup, Extension -from setuptools.command.build_ext import build_ext -from distutils.version import LooseVersion +# Available at setup time due to pyproject.toml +from pybind11.setup_helpers import Pybind11Extension, build_ext +__version__ = "1.0.0" -class CMakeExtension(Extension): - def __init__(self, name, sourcedir=''): - Extension.__init__(self, name, sources=[]) - self.sourcedir = os.path.abspath(sourcedir) +# Note: +# Sort input source files if you glob sources to ensure bit-for-bit +# reproducible builds (https://github.com/pybind/python_example/pull/53) - -class CMakeBuild(build_ext): - def run(self): - try: - out = subprocess.check_output(['cmake', '--version']) - except OSError: - raise RuntimeError("CMake must be installed to build the following extensions: " + - ", ".join(e.name for e in self.extensions)) - - if platform.system() == "Windows": - cmake_version = LooseVersion(re.search(r'version\s*([\d.]+)', out.decode()).group(1)) - if cmake_version < '3.1.0': - raise RuntimeError("CMake >= 3.1.0 is required on Windows") - - for ext in self.extensions: - self.build_extension(ext) - - def build_extension(self, ext): - extdir = os.path.abspath(os.path.dirname(self.get_ext_fullpath(ext.name))) - # required for auto-detection of auxiliary "native" libs - if not extdir.endswith(os.path.sep): - extdir += os.path.sep - - cmake_args = ['-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + extdir, - '-DPYTHON_EXECUTABLE=' + sys.executable] - - cfg = 'Debug' if self.debug else 'Release' - build_args = ['--config', cfg] - - if platform.system() == "Windows": - cmake_args += ['-DCMAKE_LIBRARY_OUTPUT_DIRECTORY_{}={}'.format(cfg.upper(), extdir)] - if sys.maxsize > 2**32: - cmake_args += ['-A', 'x64'] - build_args += ['--', '/m'] - else: - cmake_args += ['-DCMAKE_BUILD_TYPE=' + cfg] - build_args += ['--', '-j2'] - - env = os.environ.copy() - env['CXXFLAGS'] = '{} -DVERSION_INFO=\\"{}\\"'.format(env.get('CXXFLAGS', ''), - self.distribution.get_version()) - if not os.path.exists(self.build_temp): - os.makedirs(self.build_temp) - subprocess.check_call(['cmake', ext.sourcedir] + cmake_args, cwd=self.build_temp, env=env) - subprocess.check_call(['cmake', '--build', '.'] + build_args, cwd=self.build_temp) +ext_modules = [Pybind11Extension( + "oxenmq", + ["src/bencode.cpp", "src/module.cpp", "src/oxenmq.cpp"], + cxx_std=17, + libraries=["oxenmq"], + ), +] setup( - name='pyoxenmq', - version='0.1.0', - author='Jeff Becker', - author_email='jeff@i2p.rocks', - description='pybind oxenmq bindings', - long_description='', - ext_modules=[CMakeExtension('pyoxenmq')], - packages=["lokinet.auth"], - cmdclass=dict(build_ext=CMakeBuild), + name="oxenmq", + version=__version__, + author="Jason Rhinelander", + author_email="jason@oxen.io", + url="https://github.com/oxen-io/oxen-mq", + description="Python wrapper for oxen-mq message passing library", + long_description="", + ext_modules=ext_modules, + cmdclass={"build_ext": build_ext}, zip_safe=False, ) diff --git a/src/bencode.cpp b/src/bencode.cpp new file mode 100644 index 0000000..82419f3 --- /dev/null +++ b/src/bencode.cpp @@ -0,0 +1,15 @@ +#include "common.hpp" +#include "oxenmq/base32z.h" + +namespace oxenmq { + +void BEncode_Init(py::module& mod) { + mod.def("base32z_encode", [](py::bytes data) { + char* ptr = nullptr; + py::ssize_t sz = 0; + PyBytes_AsStringAndSize(data.ptr(), &ptr, &sz); + return oxenmq::to_base32z(ptr, ptr+sz); + }); +} + +} diff --git a/pyoxenmq/common.hpp b/src/common.hpp similarity index 57% rename from pyoxenmq/common.hpp rename to src/common.hpp index a6b3d31..4bb8dce 100644 --- a/pyoxenmq/common.hpp +++ b/src/common.hpp @@ -5,11 +5,9 @@ namespace py = pybind11; -namespace oxenmq -{ - void - OxenMQ_Init(py::module &mod); +namespace oxenmq { + +void OxenMQ_Init(py::module &mod); +void BEncode_Init(py::module & mod); - void - BEncode_Init(py::module & mod); } diff --git a/src/module.cpp b/src/module.cpp new file mode 100644 index 0000000..a9da6ba --- /dev/null +++ b/src/module.cpp @@ -0,0 +1,6 @@ +#include "common.hpp" + +PYBIND11_MODULE(oxenmq, m) { + oxenmq::OxenMQ_Init(m); + oxenmq::BEncode_Init(m); +} diff --git a/src/oxenmq.cpp b/src/oxenmq.cpp new file mode 100644 index 0000000..ac94e2f --- /dev/null +++ b/src/oxenmq.cpp @@ -0,0 +1,953 @@ +#include "common.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace oxenmq { + +// Convert a py::object containing a str, bytes, or iterable over str/bytes to a vector of message +// parts. Throws on invalid input, otherwise returns a vector of parts. +// +// The gil must be held. +// +// The first version appends into `parts`, the second version returns a vector. +void extract_data_parts(std::vector& parts, py::handle obj) { + if (py::isinstance(obj)) + parts.push_back(obj.cast()); + else if (py::isinstance(obj)) + parts.push_back(obj.cast()); + else if (py::isinstance(obj)) { + for (auto o : obj) { + if (py::isinstance(o)) + parts.push_back(o.cast()); + else if (py::isinstance(o)) + parts.push_back(o.cast()); + else + throw std::runtime_error{"invalid iterable containing '" + std::string{py::repr(o)} + "': expected bytes/str"}; + } + } else { + throw std::runtime_error{"invalid value '" + std::string{py::repr(obj)} + "': expected bytes/str/iterable"}; + } +} +std::vector extract_data_parts(py::handle obj) { + std::vector parts; + extract_data_parts(parts, obj); + return parts; +} +std::vector extract_data_parts(py::args& args) { + std::vector data; + for (auto arg: args) + extract_data_parts(data, arg); + return data; +} + +// Quick and dirty logger that logs to stderr. It would be much nicer to take a python function, +// but that deadlocks pretty much right away because of the crappiness of the gil. +struct stderr_logger { + inline static std::mutex log_mutex; + + void operator()(LogLevel lvl, const char* file, int line, std::string msg) { + std::lock_guard l{log_mutex}; + std::cerr << '[' << lvl << "][" << file << ':' << line << "]: " << msg << "\n"; + } +}; + +constexpr auto noopt = [] {}; + +void +OxenMQ_Init(py::module & mod) +{ + using namespace pybind11::literals; + constexpr py::kw_only kwonly{}; + + py::class_(mod, "ConnectionID") + .def(py::self == py::self) + .def(py::self != py::self) + .def_property_readonly("service_node", &ConnectionID::sn) + .def_property_readonly("pubkey", [](const ConnectionID& c) { return py::bytes(c.pubkey()); }) + ; + py::class_
(mod, "Address") + .def(py::init(), "addr"_a, + R"(Constructs from an encoded address such as 'curve://HOSTNAME:PORT/PUBKEY'. + +See oxenmq::Address C++ documentation for more details)") + .def(py::init([](std::string_view addr, py::bytes pubkey) { return address{addr, (std::string) pubkey}; }), + "addr"_a, "pubkey"_a, + R"(Constructs from a ZMQ connection string and a 32-byte pubkey. + +This can be used for ipc+curve connections by using an address such as ipc:///path/to/socket)") + .def(py::init(py::overload_cast(&address::tcp)), + "host"_a, "port"_a, + R"(Construct a TCP address from a host and port. + +The connection will be plaintext. If the host is an IPv6 address it *must* be surrounded with [ and ].)") + .def(py::init([](std::string host, uint16_t port, py::bytes pubkey) { + return address::tcp_curve(std::move(host), port, pubkey); }), + "host"_a, "port"_a, "pubkey"_a, + "Constructs a curve-encrypted TCP address") + + .def_property("pubkey", + [](const address& a) { return py::bytes(a.pubkey); }, + [](address& a, std::optional pubkey) { + if (pubkey) a.set_pubkey(pubkey->operator std::string()); + else a.set_pubkey(""); + }, + R"(Sets or clears the address pubkey. + +If specifying a pubkey then this address is set to use curve encryption with the given 32-byte +`bytes` pubkey required for the remote endpoint. An existing pubkey is replaced, if present. + +If set to None then this address is set to use an unencrypted plaintext connection.)") + .def_property_readonly("curve", py::overload_cast<>(&address::curve, py::const_), "true if this is a curve-enabled address") + .def_property_readonly("tcp", py::overload_cast<>(&address::tcp, py::const_), "true if this is a tcp address") + .def_property_readonly("ipc", py::overload_cast<>(&address::ipc, py::const_), "true if this is an ipc address") + .def_property_readonly("zmq_address", &address::zmq_address, + "accesses the zmq address portion of the address (note that this does not contain any curve encryption information)") + .def_property_readonly("full_address", [](const address &a) { return a.full_address(address::encoding::base32z); }, + "returns the full address, including curve information, encoding the curve pubkey as base32z") + .def_property_readonly("full_address_b64", [](const address &a) { return a.full_address(address::encoding::base64); }, + "returns the full address, including curve information, encoding the curve pubkey as base64") + .def_property_readonly("full_address_hex", [](const address &a) { return a.full_address(address::encoding::hex); }, + "returns the full address, including curve information, encoding the curve pubkey as hex") + .def_property_readonly("qr", &address::qr_address, + R"(Access the full address as a RQ-encoding optimized string. + +Only available for tcp addresses. The resulting string only contains characters from the +Alphanumeric QR alphabet (i.e. all uppercase, and uses $...$ for IPv6 addresses instead of [...]), +allowing for more efficient QR encoding. This format can be passed to the single-argument Address +constructor.)") + .def(py::self == py::self) + .def(py::self != py::self) + ; + py::class_(mod, "TaggedThreadID"); + py::class_(mod, "TimerID"); + + py::enum_(mod, "AuthLevel") + .value("denied", AuthLevel::denied, + "Not actually an auth level, but can be returned by the AllowFunc to deny an incoming connection.") + .value("none", AuthLevel::none, + "No authentication at all; any random incoming ZMQ connection can invoke this command.") + .value("basic", AuthLevel::basic, + "Basic authentication commands require a login, or a node that is specifically configured to be a public node (e.g. for public RPC).") + .value("admin", AuthLevel::admin, + R"(Advanced authentication commands require an admin user, either via explicit login or by implicit login from localhost. + +This typically protects administrative commands like shutting down or access to sensitive data.)") + ; + + py::class_(mod, "Access", "The access level for a command category.") + .def(py::init(), + "auth"_a = AuthLevel::none, + "remote_sn"_a = false, + "local_sn"_a = false, + R"(Specifies a command category access level. + +- auth - the required access level to access commands in this category. + +- remote_sn - if True then this command may only be invoked by remote connections who we recognize + (by pubkey) as being service nodes. (Requires sn_lookup to be provided during construction). + +- local_sn - if True then this command will be unavailable if the OxenMQ object was not constructed + with service_node=True.)"); + py::implicitly_convertible(); + + py::class_ msg(mod, "Message", "Temporary object containing details of a just-received message"); + msg + .def_property_readonly("is_reply", [](const Message& m) { return !m.reply_tag.empty(); }, + "True if this message is expecting a reply (i.e. it was received on a request_command endpoint)") + .def_readonly("remote", &Message::remote, R"(Some sort of remote address from which the request came. + +Typically the IP address string for TCP connections and "localhost:UID:GID:PID" for unix socket IPC connections.)") + .def_readonly("conn", &Message::conn, "The connection ID info for routing a reply") + .def_readonly("access", &Message::access, + "The access level of the invoker (which can be higher than the access level required for the command category") + .def("dataview", [](const Message& m) { + py::list l{m.data.size()}; + for (auto& part : m.data) + l.append(py::memoryview::from_memory(part.data(), part.size())); + return l; + }, + R"(Returns a list of the data message parts as memoryviews. + +Note that the returned views are only valid for the duration of the callback invoked with the +Message; if you need them beyond that then you must copy them (e.g. by calling message.data() +or .to_bytes() on each one)" + ) + .def("data", [](const Message& m) { + py::list l{m.data.size()}; + for (auto& part : m.data) + l.append(py::bytes{part.data(), part.size()}); + return l; + }, + "Returns a *copy* of the data message parts as a list of `bytes`." + ) + .def("reply", [](Message& m, py::args args) { + m.send_reply(send_option::data_parts(extract_data_parts(args))); + }, + R"(Sends a reply back to this caller. + +`args` must be bytes, str, or iterables thereof (and will be flatted). Should only be used from a +request_command endpoint (i.e. when .is_reply is true)") + .def("back", [](Message& m, std::string command, py::args args) { + m.send_back(command, send_option::data_parts(extract_data_parts(args))); + }, + "command"_a, + R"(Sends a new message (NOT a reply) to the caller. + +This is used to send a simple message back to the caller which is *not* specifically a request reply +but rather is simply an endpoint to invoke on the caller who sent this message to us. `command` +should be the command endpoint, and `response` must be none (for no data parts), bytes, str, or an +iterable thereof. + +This is a shortcut for `oxenmq.send(msg.conn, command, *args)`; use the full version if you need +extra send functionality.)") + .def("request", [](Message& m, std::string command, OxenMQ::ReplyCallback callback, py::args args) { + std::vector data; + m.send_request(command, std::move(callback), send_option::data_parts(extract_data_parts(args))); + }, + "command"_a, "on_reply"_a, + R"(Sends a new request (NOT a reply) back to the remote caller. + +This is used to send a new request back to the caller which is *not* specifically a request reply +but rather is simply a new request endpoint to invoke on the caller who sent this message to us. +`command` should be the command endpoint, and `response` must be none (for no data parts), bytes, +str, or an iterable thereof. + +This is a shortcut for `oxenmq.request(msg.conn, command, on_reply, *args)`; use the full version if you need +extra send functionality.)") + .def("later", &Message::send_later, + R"(Returns a proxy object which can be stored and used to reply to the remote caller at a later point. + +Unlike Message, the value returned here may outlive the command callback (as long as the OxenMQ +instance is still alive).)") + ; + + py::class_(msg, "DeferredSend") + .def_property_readonly("is_reply", [](const Message::DeferredSend& m) { return !m.reply_tag.empty(); }, + "True if this message is expecting a reply (i.e. it was received on a request_command endpoint)") + .def("reply", [](Message::DeferredSend& d, py::args args) { + d.reply(send_option::data_parts(extract_data_parts(args))); + }, + "Same as Message.reply(), but deferrable") + .def("back", [](Message::DeferredSend& d, std::string command, py::args args) { + d.back(command, send_option::data_parts(extract_data_parts(args))); + }, + "command"_a, + "Same as Message.back(), but deferrable") + .def("request", [](Message::DeferredSend& d, std::string command, OxenMQ::ReplyCallback callback, py::args args) { + d.request(command, std::move(callback), send_option::data_parts(extract_data_parts(args))); + }, + "command"_a, "on_reply"_a, + "Same as Message.request(), but deferrable") + .def("__call__", [](Message::DeferredSend& d, py::args args, py::kwargs kwargs) { + auto method = py::cast(&d).attr(!d.reply_tag.empty() ? "reply" : "back"); + return method(*args, **kwargs); + }, + "Equivalent to `reply(...)` for a request message, `back(...)` for a non-request message") + ; + + py::class_(mod, "Category", + "Helper class to add in registering category commands, returned from OxenMQ.add_category(...)") + .def("add_command", &CatHelper::add_command) + .def("add_request_command", + [](CatHelper &cat, + std::string name, + std::function handler) + { + cat.add_request_command(name, [handler](Message& msg) { + std::vector result; + { + py::gil_scoped_acquire gil; + + py::object obj = handler(&msg); + if (obj.is_none()) + return; + try { + result = extract_data_parts(obj); + } catch (const std::exception& e) { + msg.oxenmq.log(LogLevel::warn, __FILE__, __LINE__, + "Python callback returned "s + e.what()); + return; + } + } + msg.send_reply(send_option::data_parts(result)); + }); + }, + R"(Add a request command to this category. + +Adds a request command, that is, a command that is always expected to reply, to this category. The +callback must return one of: + +- None - no reply will be sent; typically returned because you sent it yourself (via + Message.send_reply()), or because you want to send it later via Message.send_later(). +- bytes - will be sent as is in a single-part reply. +- str - will be sent in utf-8 encoding in a single-part reply. +- iterable object containing bytes and/or str elements: will be sent as a multi-part reply where + each part is sent as-is (bytes) or utf8-encoded (str). + +The callback also must take care not to save the provided `Message` value beyond the end of the +callback itself.)") + ; + + py::enum_(mod, "LogLevel") + .value("fatal", LogLevel::fatal).value("error", LogLevel::error).value("warn", LogLevel::warn) + .value("info", LogLevel::info).value("debug", LogLevel::debug).value("trace", LogLevel::trace); + + py::class_ oxenmq{mod, "OxenMQ"}; + oxenmq + .def(py::init<>()) + .def(py::init([](LogLevel level) { + // Quick and dirty logger that logs to stderr. It would be much nicer to take a python + // function, but that deadlocks pretty much right away because of the crappiness of the gil. + return std::make_unique(stderr_logger{}, level); + })) + .def(py::init([](py::bytes pubkey, py::bytes privkey, bool sn, OxenMQ::SNRemoteAddress sn_lookup, std::optional log_level) { + return std::make_unique(pubkey, privkey, sn, std::move(sn_lookup), + log_level ? OxenMQ::Logger{stderr_logger{}} : nullptr, + log_level.value_or(LogLevel::warn)); + }), + "pubkey"_a = "", "privkey"_a = "", "service_node"_a = false, + "sn_lookup"_a = nullptr, "log_level"_a = LogLevel::warn, + R"(OxenMQ constructor. + +This constructs the object but does not start it; you will typically want to first add categories +and commands, then finish startup by invoking `start()`. (Categories and commands cannot be added +after startup). + +Parameters: +- pubkey - the x25519 public key (32-byte binary string). For a service node this is the service + node x25519 keypair. For non-service nodes this (and privkey) can be empty strings to + automatically generate an ephemeral keypair. + +- privkey the service node's private key (32-byte binary string), or empty to generate one. + +- service_node - True if this instance should be considered a service node for the purpose of + allowing "Access::local_sn" remote calls. (This should be true if we are *capable* of being a + service node, whether or not we are currently actively). If specified as true then the pubkey and + privkey values must not be empty. + +- sn_lookup - function that takes a pubkey key (32-byte bytes) and returns a connection string such + as "tcp://1.2.3.4:23456" to which a connection should be established to reach that service node. + Note that this function is only called if there is no existing connection to that service node, + and that the function is never called for a connection to self (that uses an internal connection + instead). Also note that the service node must be listening in curve25519 mode (otherwise we + couldn't verify its authenticity). Should return empty for not found or if SN lookups are not + supported. If omitted a stub function is used that always returns empty. + +- log_level the initial log level; defaults to warn. The log level can be changed later by calling + log_level(...). Note that currently all logging goes directly to stderr because of obstacles with + Python's GIL. In the future a Python callback may be supported for log messages. +)") + + .def_readwrite("handshake_time", &OxenMQ::HANDSHAKE_TIME, + "How long to wait for handshaking to complete on new connections before timing out.") + .def_readwrite("ephemeral_routing_id", &OxenMQ::EPHEMERAL_ROUTING_ID, + R"(Whether to use random connection IDs for outgoing connections. + +If set to True then use random connection IDs for each outgoing connection rather than the default +IDs based on the local pubkey (False). Using the same connection ID allows re-establishing +connections (even after an application restart) with a remote without losing incoming messages, but +does not allow multiple connections to a remote using the same keypair.)") + .def_readwrite("max_message_size", &OxenMQ::MAX_MSG_SIZE, + R"(Maximum incoming message size. + +If a remote attempts to send something larger the connection is closed. -1 means no limit.)") + .def_readwrite("max_sockets", &OxenMQ::MAX_SOCKETS, + R"(Maximum open sockets supported by the internal zmq layer. + +Defaults to a large number. If changing then this must be set before start() is called.)") + .def_readwrite("reconnect_interval", &OxenMQ::RECONNECT_INTERVAL, + "Minimum time to wait before attempting to reconnect a failed connection.") + .def_readwrite("reconnect_interval_max", &OxenMQ::RECONNECT_INTERVAL_MAX, + R"(Maximum reconnect interval. + +When larger than omq.reconnect_interval then upon subsequent reconnection failures an +exponential backoff will be used, up to at most this interval between reconnection attempts.)") + .def_readwrite("close_linger", &OxenMQ::CLOSE_LINGER, + "How long (at most) to wait for connections to close cleanly when closing.") + .def_readwrite("connection_check_interval", &OxenMQ::CONN_CHECK_INTERVAL, + R"(How frequently we cleanup connections. + +Cleaning up involves closing idle connections and calling connect or request failure callbacks. +Making this slower results in more "overshoot" before failure callbacks are invoked; making it too +fast results in more proxy thread overhead. Any change to this variable must be set before calling +start().)") + .def_readwrite("connection_heartbeat", &OxenMQ::CONN_HEARTBEAT, + R"(Whether to enable heartbeats on incoming/outgoing connections. + +If set to > 0 then we set up ZMQ to send a heartbeat ping over the socket this often, which helps +keep the connection alive and lets failed connections be detected sooner (see also +connection_heartbeat_timeout). Changing the value only affects new connections. + +Defaults to 15s)") + .def_readwrite("connection_heartbeat_timeout", &OxenMQ::CONN_HEARTBEAT_TIMEOUT, + R"(How long after missing heartbeats to consider a socket dead. + +When .conn_heartbeat is enabled, this sets how long we wait for a reply on a socket before +considering the socket to have died and closing it. Changing the value only affects new +connections.)") + .def_readwrite("startup_umask", &OxenMQ::STARTUP_UMASK, + R"(The umask to apply when constructing sockets + +This primarily affects any new ipc:// listening sockets that get created. Does nothing if set to -1 +(the default), and does nothing on Windows. Note that the umask is applied temporarily during +`start()`, so may affect other threads that create files/directories at the same time as the start() +call.)") + .def_property_readonly("pubkey", + [](const OxenMQ& self) { + auto& pub = self.get_pubkey(); + return py::bytes(pub.data(), pub.size()); + }, + "Accesses this OxenMQ's x25519 public key, as bytes.") + .def_property_readonly("privkey", + [](const OxenMQ& self) { + auto& priv = self.get_privkey(); + return py::bytes(priv.data(), priv.size()); + }, + "Accesses this OxenMQ's x25519 private key, as bytes.") + .def("start", &OxenMQ::start, R"(Starts the OxenMQ object. + +This is called after all initialization (categories, etc.) is configured. This binds to the bind +locations given in the constructor and launches the proxy thread to handle message dispatching +between remote nodes and worker threads. + +Things you want to do before calling this: +- Use `add_category`/`add_command` to set up any commands remote connections can invoke. +- If any commands require SN authentication, specify a list of currently active service node + pubkeys via `set_active_sns()` (and make sure this gets updated when things change by + another `set_active_sns()` or a `update_active_sns()` call). It *is* possible to make the + initial call after calling `start()`, but that creates a window during which incoming + remote SN connections will be erroneously treated as non-SN connections. +- If this LMQ instance should accept incoming connections, set up any listening ports via + `listen_curve()` and/or `listen_plain()`.)") + .def("listen", [](OxenMQ& self, + std::string bind, + bool curve, + OxenMQ::AllowFunc allow, + std::function on_bind) { + if (curve) + self.listen_curve(bind, std::move(allow), std::move(on_bind)); + else + self.listen_plain(bind, std::move(allow), std::move(on_bind)); + }, + "bind"_a, "curve"_a, kwonly, "allow_connection"_a = nullptr, "on_bind"_a = nullptr, + R"(Start listening on the given bind address. + +Incoming connections can come from anywhere. `allow_connection` is invoked for any incoming +connections on this address to determine the incoming remote's access and authentication level. + +This method may be called after start if dynamic listening is required, but it is generally +recommended that long-term fixed listening endpoints be set up by calling this *before* start(). + +Parameters: + +- bind - can be any bind address string zmq supports, for example a tcp IP/port combination such as: + "tcp://*:4567" or "tcp://1.2.3.4:5678". + +- curve - whether the connection is curve-encrypted (True) or plaintext (False). For plaintext + connections the allow_connection callback will be invoked with an empty remote pubkey and + service_node set to False. + +- allow_connection function to call to determine whether to allow the connection and, if so, the + authentication level it receives. The function is called with the remote's address, the remote's + 32-byte pubkey (only for curve; empty for plaintext), and whether the remote is recognized as a + service node (always False for plaintext; requires sn_lookup being configured in construction). + If omitted (or null) the default returns AuthLevel::none access for all incoming connections. + +- on_bind a callback to invoke when the port has been successfully opened or failed to open, called + with a single boolean argument of True for success, False for failure. For addresses set up + before .start() this will be called during `start()` itself; for post-start listens this will be + called from the proxy thread when it opens the new port. Note that this function is called + directly from the proxy thread and so should be fast and non-blocking. +)") + .def("add_tagged_thread", [](OxenMQ& self, std::string name, std::function start) { + return self.add_tagged_thread(std::move(name), std::move(start)); + }, + "name"_a, kwonly, "start"_a = std::nullopt, + R"(Creates a "tagged thread". + +This creates a thread with a specific "tag" and starts it immediately. A tagged thread is one that +batches, jobs, and timer jobs can be sent to specifically, typically to perform coordination of some +thread-unsafe work, that will be reserved for use only for jobs that specifically request its use. + +Tagged threads will *only* process jobs sent specifically to them; they do not participate in the +thread pool used for regular jobs. Each tagged thread also has its own job queue completely +separate from any other jobs. + +Tagged threads must be created *before* `start()` is called. The name will be used to set the +thread name in the process table (if supported on the OS). + +Parameters: + +- name - the name of the thread; will be used in log messages and (if supported by the OS) as the + system thread name. + +- start - an optional callback to invoke from the thread as soon as OxenMQ itself starts up (i.e. + after a call to `start()`). + +Returns a TaggedThreadID object that can be passed to job(), batch(), or add_timer() to direct the +task to the tagged thread. +)") + .def("set_general_threads", &OxenMQ::set_general_threads, "threads"_a, + R"(Sets the number of general threads to use for handling requests. + +This controls the maximum number of threads that may be created to deal with general tasks such as +handling incoming commands. Cannot be called after `start()`. + +If not called then the default is to use the number of CPUs/threads detected on the system. + +Changing this also affects the default value of set_batch_threads() and set_reply_threads().)") + .def("set_reply_threads", &OxenMQ::set_reply_threads, "threads"_a, + + R"(Set the minimum number of threads used for processing replies. + +This sets the minimum number of threads dedicated to processing incoming request commands (i.e. that +need a reply) and related operations such as the on_success/on_failure handlers for establishing +connections. Cannot be called after `start()`. + +If unset, this defaults to one-eighth of the number of general threads, rounded up. + +Note that any such tasks would *first* use general threads; this limit would allow spawning new +threads only if all general threads are currently busy *and* there are not at least this many reply +tasks currently being processed.)") + + .def("set_batched_threads", &OxenMQ::set_batch_threads, + "threads"_a, R"(Set the maximum number of threads to spawn for batch jobs. + +This sets the limit on the maximum number of threads that may be created to process batch jobs, +including explicit batch jobs and timers scheduled via add_timer(), but not including jobs that are +sent to a specific TaggedThreadID. Cannot be called after `start()`. + +Batch jobs will first attempt to use an available general thread; an extra thread will be spawned to +handle a batch job only if all general threads are currently busy *and* fewer than this many threads +are currently processing batch jobs.)") + + .def("add_timer", py::overload_cast< + std::function, + std::chrono::milliseconds, + bool, + std::optional>(&OxenMQ::add_timer), + "job"_a, "interval"_a, kwonly, "squelch"_a = true, "thread"_a = std::nullopt, + R"(Adds a callback to be invoked on a repeating timer. + +The callback will be invoke approximately every `interval`. + +Optional parameters: + +- squelch - When True (the default) this job will not be double-booked: that is, the callback will + be skipped if a previous callback from this timer is already scheduled (or is still running). If + set to False then the callback will be scheduled even if an existing callback has not yet + completed. + +- thread - a TaggedThreadID specifying a tagged thread (created with `add_tagged_thread`) in which + the timer should run. If unspecified then the timer runs in the general batch job queue.)") + .def("cancel_timer", &OxenMQ::cancel_timer, R"(Cancels a timer previously added with add_timer(). + +Note that this does not queue any already-scheduled timer job and as a result it is possible for an +already-scheduled timer job to run *after* this call removes the timer. + +It is permitted to call this with the same TimerID multiple times; subsequent calls have no effect.)") + .def("job", &OxenMQ::job, "job"_a, py::kw_only(), "thread"_a = std::nullopt, + R"(Queues a job to be run as an OxenMQ job. + +This submits a callback to be invoked by OxenMQ. The job can either be scheduled with general batch +jobs or can be directed to a specific tagged thread (created with `add_tagged_thread`).)") + .def("add_category", &OxenMQ::add_category, + "name"_a, "access_level"_a, kwonly, "reserved_threads"_a = 0, "max_queue"_a = 200, + py::keep_alive<0, 1>(), + R"(Add a new command category. + +Returns an object that adds commands or request commands to the category. + +Parameters: + +name - the category name; must be non-empy and may not contain a ".". + +access_level - the access requirements for remote invocation of the commands inside this category. +Can either be a full Access instance or, when you don't care about SN status, simply an AuthLevel +value. + +reserved_threads - if > 0 then always allow this many endpoints of this category to be processed at +once. In particular if no worker thread is available and fewer than this are currently processing +commands in this category then we will spawn a new thread to handle the task. This is used to +ensure that some categories always have processing capacity even if other long-running tasks are +using all general threads. + +max_queue - the maximum number of incoming commands that will be queued for commands in this +category waiting for an available thread to process them before we start dropping new incoming +commands. -1 means unlimited, 0 means we never queue (i.e. we drop if no thread is immediately +available). +)") + .def("add_command_alias", &OxenMQ::add_command_alias, + "from"_a, "to"_a, + R"(Adds a command alias. + +This allows adding backwards-compatible aliases for commands that have moved. For example mapping +"cat.meow" to "dog.bark" would make any incoming requests to the cat.meow endpoint be treated as if +they had requested the dog.bark endpoint. Note that this mapping happens *before* applying category +permissions: in this example, the required permissions the access the endpoint would be those of the +"dog" category rather than the "cat" category.)") + .def("connect_remote", [](OxenMQ& self, + const address& remote, + OxenMQ::ConnectSuccess on_success, + OxenMQ::ConnectFailure on_failure, + std::optional timeout, + std::optional ephemeral_routing_id) { + + return self.connect_remote(remote, std::move(on_success), std::move(on_failure), + connect_option::timeout{timeout.value_or(oxenmq::REMOTE_CONNECT_TIMEOUT)}, + connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)} + ); + }, + "remote"_a, "on_success"_a, "on_failure"_a, + kwonly, + "timeout"_a = std::nullopt, "ephemeral_routing_id"_a = std::nullopt, + R"( +Starts connecting to a remote address and return immediately. The connection can be used +immediately, however messages will only be queued until the connection is established (or dropped if +the connection fails). The given callbacks are invoked for success or failure. + +`ephemeral_routing_id` and `timeout` allowing overriding the defaults (oxenmq.EPHEMERAL_ROUTING_ID +and 10s, respectively). +)") + .def("connect_remote", [](OxenMQ& self, const address& remote, std::chrono::milliseconds timeout) { + std::promise promise; + self.connect_remote( + remote, + [&promise](ConnectionID id) { promise.set_value(std::move(id)); }, + [&promise](auto, std::string_view reason) { + promise.set_exception(std::make_exception_ptr( + std::runtime_error{"Connection failed: " + std::string{reason}})); + }); + return promise.get_future().get(); + }, "remote"_a, "timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, + R"(Simpler version of connect_remote that connects to a remote address synchronously. + +This will block until the connection is established or times out; throws on connection failure, +returns the ConnectionID on success. + +Takes the address and an optional `timeout` to override the timeout (default 10s))") + .def("connect_sn", [](OxenMQ& self, + py::bytes pubkey, + std::optional keep_alive, + std::optional remote_hint, + std::optional ephemeral_routing_id) { + return self.connect_sn(std::string{pubkey}, + connect_option::keep_alive{keep_alive.value_or(-1ms)}, + connect_option::hint{remote_hint.value_or("")}, + connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)}); + }, "pubkey"_a, kwonly, "keep_alive"_a, "remote_hint"_a, "ephemeral_routing_id"_a, + R"(Connect to a remote service node by pubkey. + +Try to initiate a connection to the given SN in anticipation of needing a connection in the future. +If a connection is already established, the connection's idle timer will be reset (so that the +connection will not be closed too soon). If the given idle timeout is greater than the current idle +timeout then the timeout increases to the new value; if less than the current timeout it is ignored. +(Note that idle timeouts only apply if the existing connection is an outgoing connection). + +Note that this method (along with send) doesn't block waiting for a connection; it merely instructs +the proxy thread that it should establish a connection. + +Parameters: +- pubkey - the public key (length 32 bytes value) of the service node to connect to; the remote's + address will be determined by calling the sn_lookup function giving during construction. + +- keep_alive - how long the SN connection will be kept alive after valid activity. Defaults to 5 + minutes, which is notably longer than the default 30s for automatic connections established when + using `send()` with a pubkey. + +- remote_hint - a connection string that *may* be used instead of doing a lookup via the sn_lookup + callback to find the remote address. Typically provided only if the location has already been + looked up for some other reason. + +- ephemeral_routing_id - if set, override the default OxenMQ.EPHEMERAL_ROUTING_ID for this + connection. + +Returns a ConnectionID that identifies an connection with the given SN. Typically you *don't* need +to worry about saving this (and can just discard it): you can always simply pass the pubkey into +send/request methods to send to the SN by pubkey. +)") + .def("connect_inproc", &OxenMQ::connect_inproc<>, + "on_success"_a, "on_failure"_a, + R"(Establish a connection to ourself. + +Connects to the built-in in-process listening socket of this OxenMQ server for local communication. +Note that auth_level defaults to admin (unlike connect_remote), and the default timeout is much +shorter. + +Also note that incoming inproc requests are unauthenticated: that is, they will always have +admin-level access. +)") + .def("disconnect", &OxenMQ::disconnect, + "conn"_a, "linger"_a = 1s, + R"(Disconnect an established connection. + +Disconnects an established outgoing connection established with `connect_remote()` (or, less +commonly, `connect_sn()`). + +`linger` allows you to control how long (at most) the connection will be allowed to linger if there +are still pending messages to be delivered; if those messages are not delivered within the given +time then the connection is closed anyway. (Note that this is non-blocking: the lingering occurs in +the background).)") + + .def("send", [](OxenMQ& self, std::variant conn, + std::string command, + py::args args, py::kwargs kwargs) { + + if (auto* bytes = std::get_if(&conn)) { + if (len(*bytes) != 32) + throw std::logic_error{"Error: send(...) to=pubkey requires 32-byte pubkey"}; + conn.emplace(*bytes); + } + + bool request = kwargs.contains("request") && kwargs["request"].cast(); + std::optional on_reply, on_reply_failure; + if (request) { + if (kwargs.contains("on_reply")) + on_reply = kwargs["on_reply"].cast(); + if (kwargs.contains("on_reply_failure")) + on_reply_failure = kwargs["on_reply_failure"].cast(); + } else if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure")) { + throw std::logic_error{"Error: send(...) on_reply=/on_reply_failure= option " + "requires request=True (perhaps you meant to use `.request(...)` instead?)"}; + } + send_option::hint hint{kwargs.contains("remote_hint") ? kwargs["remote_hint"].cast() : ""s}; + send_option::optional optional{kwargs.contains("optional") && kwargs["optional"].cast()}; + send_option::incoming incoming{kwargs.contains("incoming_only") && kwargs["incoming_only"].cast()}; + send_option::outgoing outgoing{kwargs.contains("outgoing") && kwargs["outgoing"].cast()}; + send_option::keep_alive keep_alive{ + kwargs.contains("keep_alive") ? kwargs["keep_alive"].cast() : -1ms}; + send_option::request_timeout request_timeout{ + kwargs.contains("request_timeout") ? kwargs["request_timeout"].cast() : -1ms}; + send_option::queue_failure qfail; + if (kwargs.contains("queue_failure")) + qfail.callback = [f = kwargs["queue_failure"].cast>()] + (const zmq::error_t* exc) { + if (exc) + f(exc->what()); + }; + send_option::queue_full qfull; + if (kwargs.contains("queue_full")) + qfull.callback = kwargs["queue_full"].cast>(); + + std::vector data; + for (auto arg: args) + extract_data_parts(data, arg); + + if (!request) { + self.send(std::get(conn), command, + hint, optional, incoming, outgoing, keep_alive, request_timeout, + std::move(qfail), std::move(qfull)); + } else { + auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)] + (bool success, std::vector data) { + + if (success ? !on_reply : !on_fail) + return; + py::gil_scoped_acquire gil; + py::list l; + if (success) { + for (const auto& part : data) + l.append(py::memoryview::from_memory(part.data(), part.size())); + (*on_reply)(l); + } else if (on_fail) { + for (const auto& part : data) + l.append(py::bytes(part.data(), part.size())); + (*on_fail)(l); + } + }; + + self.request(std::get(conn), command, std::move(reply_cb), + hint, optional, incoming, outgoing, keep_alive, request_timeout, + std::move(qfail), std::move(qfull)); + } + }, + "conn"_a, "command"_a, + R"(Sends a message or request to a remote. + +The message is passed to the internal proxy thread to be queued for delivery (i.e. this function +does not block). + +Parameters: + +- conn - the ConnectionID or the 32-byte `bytes` pubkey to send the message to; the latter generally + requires construction with a sn_lookup callback to resolve pubkeys to connection strings. + +- command - the endpoint name, e.g. "category.command". This can be a command or a request endpoint + on the remote, but note that to properly speak to request endpoints you will also need to specify + `request=True` (or use the `.request()` wrapper method). + +- args - any additional non-keyword arguments must be str, bytes, or iterables of strings or bytes. + These will be flattened and sent as the data part of the message. str values are decoded to utf8; + bytes values are sent as-is. + +The following keyword arguments may be provided: + +- request - if true, send this as a request rather than a command. This *must* be True for request + endpoints and must be False (or omitted) for non-request endpoints. See also `request()` which + wraps `send()` to specify this for you. + +- on_reply - function to call when a response to the request is received, when making a request with + request=True. The function will be invoked with a single argument of a list of memoryviews into + the data returned by the remote side. Note that these views are only valid for the duration of + the callback: if the data needs to be preserved beyond the callback then the callback must copy + it. If this value is omitted or None then any successful response is simply discarded. + +- on_reply_failure - function to call if we do not get a successful reply, either for a timeout or + because the remote sent us a failure reply. Called with a list of bytes containing failure + information. The most common responses are: + + [b'TIMEOUT'] - no reply was received within the request timeout period + [b'UNKNOWNCOMMAND'] - the remote did not understand the given request command + [b'FORBIDDEN'] - this client does not have the requested access to invoke that command + [b'FORBIDDEN_SN'] - the command is only available to service nodes and our pubkey was not + recognized as an active service node pubkey. + [b'NOT_A_SERVICE_NODE'] - the command is only invokable on service nodes, and the remote is not + currently running as a service node. + + Note, however, that empty responses are possible, and that the array can contain multiple elements + if additional context is provided by the other end. + +- request_timeout - how long to wait for a reply to the request before giving up and calling the + reply callback with a failure status. The default, if unspecified, is 15 seconds. Should only be + specified when request=True. + +- queue_failure - a callback to invoke with an error message if we are unable to queue the message + for some reason (e.g. because the recipient is no longer reachable available). This does *not* + include an inability because we have too much queued already: see the next option for that. Note + that queueing occurs in another thread and so this method will be invoked sometime *after* the + send() call returns. + +- queue_full - a callback to invoke if we are unable to queue the message because we have too many + messages already queued for delivery to that remote. Typically this indicates some network + connectivity problem preventing delivery, or the remote may be non-responsive. + +For messages being sent to service nodes by pubkey (which requires having provided a `sn_lookup` +function during construction) the following options are also available: + +- optional - if True then only send this message if we are already connected to the given service + node, but do not establish a new connection if we are not. Only has an effect when `conn` refers + to a service node pubkey. + +- incoming_only - if True then only send this message along an incoming connection from the + service node pubkey, and do not send it if we have no such connection. + +- outgoing - if True then only send this message to the remote using an outgoing connection; an + existing connection will be used if already established, otherwise a new outgoing connection will + be made. Unlike a send() call without this option, existing *incoming* connections from the + referenced service nodes will not be used to send the message. + +- keep_alive - if specified and set to a datetime.timedelta then the given value will be used for + the keep-alive of an outgoing connection; for existing connections the keep-alive will be + increased if currently shorter, and for new connections this sets the keep-alive. Has no effect + if the messages uses an existing incoming connection. +)") + .def("request", [](py::handle self, py::args args, py::kwargs kwargs) { + self.attr("send")(*args, **kwargs, "request"_a = true); + }, + "Convenience shortcut for oxenmq.send(..., request=True)") + .def("inject_task", &OxenMQ::inject_task, + "category"_a, "command"_a, "remote"_a, "callback"_a, + R"(Inject a callback as if it were a remotely invoked command. + +This method takes a callback and queues it to be invoked as if it had been called remotely. This allows +external command processing to be combined with oxenmq task scheduling. + +For example, oxen-core uses this to handle RPC requests coming in over HTTP as if they were incoming +OxenMQ RPC requests, with the same scheduling and queuing of requests applied to both HTTP and +OxenMQ requests so that both are treated fairly in terms of processing priority.)") + ; + + py::enum_(mod, "future_status") + .value("deferred", std::future_status::deferred) + .value("ready", std::future_status::ready) + .value("timeout", std::future_status::timeout); + py::class_>(mod, "ResultFuture", + "Wrapper around a C++ future allowing inspecting and waiting for the result to become available.") + .def("get", [](std::future& f) { + { + py::gil_scoped_release no_gil; + f.wait(); + } + return f.get(); + }, "Gets the result (or raises an exception if the result raised an exception); must only be called once") + .def("valid", [](std::future& f) { return f.valid(); }, + "Returns true if the result is available") + .def("wait", &std::future::wait, py::call_guard(), + "Waits indefinitely for the result to become available") + .def("wait_for", &std::future::template wait_for>, + py::call_guard(), + "Waits up to the given timedelta for the result to become available") + .def("wait_for", [](std::future& f, double seconds) { + return f.wait_for(std::chrono::duration{seconds}); }, + py::call_guard(), + "Waits up to the given number of seconds for the result to become available") + .def("wait_until", + &std::future::template wait_until, + py::call_guard(), + "Wait until the given datetime for the result to become available") + ; + + oxenmq.def("request_future", [](py::handle self, py::args args, py::kwargs kwargs) { + if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure")) + throw std::logic_error{"Cannot call request_wait(...) with on_reply= or on_reply_failure="}; + + auto result = std::make_shared>(); + auto fut = result->get_future(); + auto on_reply = [result](py::list value) { + assert(len(value) == 0 || py::isinstance(value[0])); + for (int i = len(value) - 1; i >= 0; i--) + value[i] = value[i].attr("tobytes")(); + }; + auto on_fail = [result](py::list value) { + if (len(value) > 0 && (std::string) py::bytes(value[0]) == "TIMEOUT"sv) { + auto msg = len(value) > 1 ? (std::string) py::bytes(value[1]) : "Request timed out"s; + PyErr_SetString(PyExc_TimeoutError, msg.c_str()); + result->set_exception(std::make_exception_ptr(py::error_already_set{})); + } + + std::string err; + for (auto& m : value) { + if (!err.empty()) err += ", "; + err += py::str(m); + } + result->set_exception(std::make_exception_ptr(std::runtime_error{"Request failed: " + err})); + }; + + self.attr("request")(*args, **kwargs, + "on_reply"_a = std::move(on_reply), + "on_reply_failure"_a = std::move(on_fail)); + return fut; + }, R"(Initiate a request with a future. + +Initiates a request and returns a future that is used to check and wait for a response to the +request. Takes the same arguments as .request(...), but without the `on_reply` and +`on_reply_failure` options. + +This can be used to make a synchronous request by simply calling .get() on the returned future: + + try: + response = omq.request_future(conn, "cat.cmd", "data").get() + except TimeoutError as e: + print("Request timed out!") + except Exception: + print("Request failed :(") + +More powerfully, you can issue multiple, parallel requests storing the returned futures then .get() +all of them to collect the responses.)"); +} + +} // namespace oxenmq From 0e1283784e4e2f2fad91ad1b58ecd75c4e7f1c13 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 21 Oct 2021 23:47:58 -0300 Subject: [PATCH 02/11] Remove unwanted headers - pybind11/gil.h is only in recent pybind (but is implicitly included by the main pybind11.h header) - detail/common shouldn't be included --- src/oxenmq.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/oxenmq.cpp b/src/oxenmq.cpp index ac94e2f..105fd8d 100644 --- a/src/oxenmq.cpp +++ b/src/oxenmq.cpp @@ -5,9 +5,7 @@ #include #include #include -#include #include -#include #include #include #include From c4d9eb8a8f8dd5f70cba2f58d63498a66b35b802 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 00:45:15 -0300 Subject: [PATCH 03/11] De-Frankenstein the build --- CMakeLists.txt | 33 +++++++++++---------------------- external/pybind11 | 2 +- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d3cf65..c86cedc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,23 +11,9 @@ project(pyoxenmq DESCRIPTION "python interface to OxenMQ" LANGUAGES CXX) -add_subdirectory(pybind11) - -pybind11_add_module(pyoxenmq MODULE - bencode.cpp - oxenmq.cpp - module.cpp - ) - -target_compile_features(pyoxenmq PRIVATE cxx_std_17) - -if(TARGET oxenmq::oxenmq) - target_link_libraries(pyoxenmq PRIVATE oxenmq::oxenmq) -else() - include(FindPkgConfig) - pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET liboxenmq>=1.2.8) - target_link_libraries(pyoxenmq PUBLIC PkgConfig::oxenmq) -endif() +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON) if(SUBMODULE_CHECK) @@ -51,13 +37,16 @@ endif() add_subdirectory(external/pybind11) -include(FindPkgConfig) -pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET GLOBAL liboxenmq) - pybind11_add_module(oxenmq MODULE src/bencode.cpp - src/bencode.cppoxenmq.cpp + src/oxenmq.cpp src/module.cpp ) -target_link_libraries(oxenmq PUBLIC PkgConfig::oxenmq) +if(TARGET oxenmq::oxenmq) + target_link_libraries(pyoxenmq PRIVATE oxenmq::oxenmq) +else() + include(FindPkgConfig) + pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET liboxenmq>=1.2.8) + target_link_libraries(oxenmq PRIVATE PkgConfig::oxenmq) +endif() diff --git a/external/pybind11 b/external/pybind11 index 3b1dbeb..97976c1 160000 --- a/external/pybind11 +++ b/external/pybind11 @@ -1 +1 @@ -Subproject commit 3b1dbebabc801c9cf6f0953a4c20b904d444f879 +Subproject commit 97976c16fb7652f7faf02d76756666ef87adbe7d From a62f6c4769f32335c5332d0dbe83b003b3749055 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 16:08:57 -0300 Subject: [PATCH 04/11] Formatting --- src/oxenmq.cpp | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/oxenmq.cpp b/src/oxenmq.cpp index 105fd8d..bf776b2 100644 --- a/src/oxenmq.cpp +++ b/src/oxenmq.cpp @@ -66,7 +66,7 @@ struct stderr_logger { constexpr auto noopt = [] {}; void -OxenMQ_Init(py::module & mod) +OxenMQ_Init(py::module& mod) { using namespace pybind11::literals; constexpr py::kw_only kwonly{}; @@ -114,11 +114,11 @@ If set to None then this address is set to use an unencrypted plaintext connecti .def_property_readonly("ipc", py::overload_cast<>(&address::ipc, py::const_), "true if this is an ipc address") .def_property_readonly("zmq_address", &address::zmq_address, "accesses the zmq address portion of the address (note that this does not contain any curve encryption information)") - .def_property_readonly("full_address", [](const address &a) { return a.full_address(address::encoding::base32z); }, + .def_property_readonly("full_address", [](const address& a) { return a.full_address(address::encoding::base32z); }, "returns the full address, including curve information, encoding the curve pubkey as base32z") - .def_property_readonly("full_address_b64", [](const address &a) { return a.full_address(address::encoding::base64); }, + .def_property_readonly("full_address_b64", [](const address& a) { return a.full_address(address::encoding::base64); }, "returns the full address, including curve information, encoding the curve pubkey as base64") - .def_property_readonly("full_address_hex", [](const address &a) { return a.full_address(address::encoding::hex); }, + .def_property_readonly("full_address_hex", [](const address& a) { return a.full_address(address::encoding::hex); }, "returns the full address, including curve information, encoding the curve pubkey as hex") .def_property_readonly("qr", &address::qr_address, R"(Access the full address as a RQ-encoding optimized string. @@ -261,7 +261,7 @@ instance is still alive).)") "Helper class to add in registering category commands, returned from OxenMQ.add_category(...)") .def("add_command", &CatHelper::add_command) .def("add_request_command", - [](CatHelper &cat, + [](CatHelper& cat, std::string name, std::function handler) { @@ -312,7 +312,12 @@ callback itself.)") // function, but that deadlocks pretty much right away because of the crappiness of the gil. return std::make_unique(stderr_logger{}, level); })) - .def(py::init([](py::bytes pubkey, py::bytes privkey, bool sn, OxenMQ::SNRemoteAddress sn_lookup, std::optional log_level) { + .def(py::init([]( + py::bytes pubkey, + py::bytes privkey, + bool sn, + OxenMQ::SNRemoteAddress sn_lookup, + std::optional log_level) { return std::make_unique(pubkey, privkey, sn, std::move(sn_lookup), log_level ? OxenMQ::Logger{stderr_logger{}} : nullptr, log_level.value_or(LogLevel::warn)); From 6a382865b23aba8fd7244884578b87ace83d68ae Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 16:09:36 -0300 Subject: [PATCH 05/11] Remove cmake build system and submodules setup.py works fine, and we don't need two separate build systems. Remove the submodules as well because we aren't using them and don't need them; you should install pybind11 and oxenmq outside this project. --- .gitmodules | 8 -------- CMakeLists.txt | 52 ----------------------------------------------- external/oxen-mq | 1 - external/pybind11 | 1 - pyproject.toml | 2 +- setup.py | 1 - 6 files changed, 1 insertion(+), 64 deletions(-) delete mode 100644 .gitmodules delete mode 100644 CMakeLists.txt delete mode 160000 external/oxen-mq delete mode 160000 external/pybind11 diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 4f8ea2e..0000000 --- a/.gitmodules +++ /dev/null @@ -1,8 +0,0 @@ -[submodule "external/pybind11"] - path = external/pybind11 - url = https://github.com/pybind/pybind11 - branch = stable -[submodule "external/oxen-mq"] - path = external/oxen-mq - url = https://github.com/oxen-io/oxen-mq - branch = dev diff --git a/CMakeLists.txt b/CMakeLists.txt deleted file mode 100644 index c86cedc..0000000 --- a/CMakeLists.txt +++ /dev/null @@ -1,52 +0,0 @@ -cmake_minimum_required(VERSION 3.10) # bionic's cmake version - - -find_program(CCACHE_PROGRAM ccache) -if(CCACHE_PROGRAM) - set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}") -endif() - -project(pyoxenmq - VERSION 1.0.0 - DESCRIPTION "python interface to OxenMQ" - LANGUAGES CXX) - -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS OFF) - -option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON) -if(SUBMODULE_CHECK) - find_package(Git) - if(GIT_FOUND) - function(check_submodule relative_path) - execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE localHead) - execute_process(COMMAND git rev-parse "HEAD:${relative_path}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE checkedHead) - string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate) - if (upToDate) - message(STATUS "Submodule '${relative_path}' is up-to-date") - else() - message(FATAL_ERROR "Submodule '${relative_path}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF") - endif() - endfunction () - - check_submodule(external/pybind11) - check_submodule(external/oxen-mq) - endif() -endif() - -add_subdirectory(external/pybind11) - -pybind11_add_module(oxenmq MODULE - src/bencode.cpp - src/oxenmq.cpp - src/module.cpp -) - -if(TARGET oxenmq::oxenmq) - target_link_libraries(pyoxenmq PRIVATE oxenmq::oxenmq) -else() - include(FindPkgConfig) - pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET liboxenmq>=1.2.8) - target_link_libraries(oxenmq PRIVATE PkgConfig::oxenmq) -endif() diff --git a/external/oxen-mq b/external/oxen-mq deleted file mode 160000 index dccbd1e..0000000 --- a/external/oxen-mq +++ /dev/null @@ -1 +0,0 @@ -Subproject commit dccbd1e8cdb9f57206077facdf973c8e86fc6bec diff --git a/external/pybind11 b/external/pybind11 deleted file mode 160000 index 97976c1..0000000 --- a/external/pybind11 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 97976c16fb7652f7faf02d76756666ef87adbe7d diff --git a/pyproject.toml b/pyproject.toml index ab05f30..89ae1ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ requires = [ "setuptools>=42", "wheel", - "pybind11>=2.4.0", + "pybind11>=2.6.0", ] build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index c6373f1..7e7d60b 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,5 @@ setup( description="Python wrapper for oxen-mq message passing library", long_description="", ext_modules=ext_modules, - cmdclass={"build_ext": build_ext}, zip_safe=False, ) From 5ceed8a416534b88c0456175f132ea38482eddd5 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 16:27:54 -0300 Subject: [PATCH 06/11] Various fixes for bugs revealed in testing - make extract data parts always flatten to arbitrary depth. We were only flattening once (and only sometimes), but arbitrary flattening is nicer (as long as we don't ultimately encouter anything that isn't str/bytes). - fix Message.data()/.dataview() pre-loading nulls into the list. The py::list constructor argument here is pre-fill, not a reservation. - fix add_command: it was broken because when the wrapped callback gets invoked it tries casting the Message via copying, but that breaks (Message isn't copyable). Fix it by adding a wrapper that does a referencing cast to a Python object. - Add missing docs for add_command - Doc typo fixes - Remove OxenMQ default and loglevel constructors, and make the remaining constructor take everything by keyword arguments. (Default constructor remains equivalent, and the log-level constructor now has to have the log level named). - Fix OxenMQ keyword constructor: invoking it was failing because the annotation default on sn_lookup wasn't properly castable to the required types (it should be py::none() rather than nullptr). Also fix the defaults for pubkey/privkey to be py::bytes() (the empty string worked fine, but using bytes makes it show up as bytes defaults in the generated signature doc string). - Further document connect_inproc with a description of why you might want it. - Fix send not actually including the message data parts - Work around segfault in the reply callback: because we have lambdas with python object captures, we'd segfault in the proxy thread when it frees them after calling them because that freeing invokes python destructor but it doesn't hold the gil. Fixed by making the lambda consume its own callbacks (which is fine because oxenmq will invoke the callback exactly once). - Fix request_future invocation which raised an error on invocation: straight lambdas aren't castable to python objects, so stuff the lambda inside std::functions, which are. --- src/oxenmq.cpp | 84 ++++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/src/oxenmq.cpp b/src/oxenmq.cpp index bf776b2..0e21e2c 100644 --- a/src/oxenmq.cpp +++ b/src/oxenmq.cpp @@ -28,14 +28,8 @@ void extract_data_parts(std::vector& parts, py::handle obj) { else if (py::isinstance(obj)) parts.push_back(obj.cast()); else if (py::isinstance(obj)) { - for (auto o : obj) { - if (py::isinstance(o)) - parts.push_back(o.cast()); - else if (py::isinstance(o)) - parts.push_back(o.cast()); - else - throw std::runtime_error{"invalid iterable containing '" + std::string{py::repr(o)} + "': expected bytes/str"}; - } + for (auto o : obj) + extract_data_parts(parts, o); } else { throw std::runtime_error{"invalid value '" + std::string{py::repr(obj)} + "': expected bytes/str/iterable"}; } @@ -45,12 +39,6 @@ std::vector extract_data_parts(py::handle obj) { extract_data_parts(parts, obj); return parts; } -std::vector extract_data_parts(py::args& args) { - std::vector data; - for (auto arg: args) - extract_data_parts(data, arg); - return data; -} // Quick and dirty logger that logs to stderr. It would be much nicer to take a python function, // but that deadlocks pretty much right away because of the crappiness of the gil. @@ -173,7 +161,7 @@ Typically the IP address string for TCP connections and "localhost:UID:GID:PID" .def_readonly("access", &Message::access, "The access level of the invoker (which can be higher than the access level required for the command category") .def("dataview", [](const Message& m) { - py::list l{m.data.size()}; + py::list l; for (auto& part : m.data) l.append(py::memoryview::from_memory(part.data(), part.size())); return l; @@ -185,7 +173,7 @@ Message; if you need them beyond that then you must copy them (e.g. by calling m or .to_bytes() on each one)" ) .def("data", [](const Message& m) { - py::list l{m.data.size()}; + py::list l; for (auto& part : m.data) l.append(py::bytes{part.data(), part.size()}); return l; @@ -259,7 +247,20 @@ instance is still alive).)") py::class_(mod, "Category", "Helper class to add in registering category commands, returned from OxenMQ.add_category(...)") - .def("add_command", &CatHelper::add_command) + .def("add_command", [](CatHelper& cat, std::string name, py::function cb) { + return cat.add_command(name, [cb=std::move(cb)](Message& m) { + py::gil_scoped_acquire gil; + cb(&m); + }); + }, + "name"_a, "callback"_a, + R"(Add a command handler to this category. + +Adds a command, that is a command that is typically some sort of instruction that requires no reply. +(For a more typically request-response interface use .add_request_command instead). + +The callback is passed a `Message` object containing details of the received message. Note that +this object must *not* be stored beyond the callback itself; see `Message` for details.)") .def("add_request_command", [](CatHelper& cat, std::string name, @@ -283,14 +284,17 @@ instance is still alive).)") } msg.send_reply(send_option::data_parts(result)); }); + return &cat; }, + "name"_a, "handler"_a, R"(Add a request command to this category. Adds a request command, that is, a command that is always expected to reply, to this category. The -callback must return one of: +callback is passed a Message object containing details of the required message and must return one +of: - None - no reply will be sent; typically returned because you sent it yourself (via - Message.send_reply()), or because you want to send it later via Message.send_later(). + Message.reply()), or because you want to send it later via Message.later(). - bytes - will be sent as is in a single-part reply. - str - will be sent in utf-8 encoding in a single-part reply. - iterable object containing bytes and/or str elements: will be sent as a multi-part reply where @@ -306,12 +310,6 @@ callback itself.)") py::class_ oxenmq{mod, "OxenMQ"}; oxenmq - .def(py::init<>()) - .def(py::init([](LogLevel level) { - // Quick and dirty logger that logs to stderr. It would be much nicer to take a python - // function, but that deadlocks pretty much right away because of the crappiness of the gil. - return std::make_unique(stderr_logger{}, level); - })) .def(py::init([]( py::bytes pubkey, py::bytes privkey, @@ -322,8 +320,9 @@ callback itself.)") log_level ? OxenMQ::Logger{stderr_logger{}} : nullptr, log_level.value_or(LogLevel::warn)); }), - "pubkey"_a = "", "privkey"_a = "", "service_node"_a = false, - "sn_lookup"_a = nullptr, "log_level"_a = LogLevel::warn, + kwonly, + "pubkey"_a = py::bytes(), "privkey"_a = py::bytes(), "service_node"_a = false, + "sn_lookup"_a = py::none(), "log_level"_a = py::none(), R"(OxenMQ constructor. This constructs the object but does not start it; you will typically want to first add categories @@ -691,6 +690,9 @@ Connects to the built-in in-process listening socket of this OxenMQ server for l Note that auth_level defaults to admin (unlike connect_remote), and the default timeout is much shorter. +This connection is designed to allow code within the same process to invoke registered commands via +the OxenMQ object. The connection works whether or not there are any accessible external listeners. + Also note that incoming inproc requests are unauthenticated: that is, they will always have admin-level access. )") @@ -751,29 +753,38 @@ the background).)") extract_data_parts(data, arg); if (!request) { - self.send(std::get(conn), command, + self.send(std::get(conn), command, send_option::data_parts(data), hint, optional, incoming, outgoing, keep_alive, request_timeout, std::move(qfail), std::move(qfull)); } else { auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)] - (bool success, std::vector data) { - - if (success ? !on_reply : !on_fail) - return; + (bool success, std::vector data) mutable { + // The gil here makes things tricky: the function invocation itself is + // already gil protected, but the *destruction* of the lambda isn't, and + // that breaks things because the destruction frees a python reference to + // the callback. However oxenmq invokes this callback exactly once so we + // can deal with it by stealing the captures out of the lambda to force + // destruction here, with the gil held. py::gil_scoped_acquire gil; + auto reply = std::move(on_reply); + auto fail = std::move(on_fail); + + if (success ? !reply : !fail) + return; py::list l; if (success) { for (const auto& part : data) l.append(py::memoryview::from_memory(part.data(), part.size())); - (*on_reply)(l); + (*reply)(l); } else if (on_fail) { for (const auto& part : data) l.append(py::bytes(part.data(), part.size())); - (*on_fail)(l); + (*fail)(l); } }; self.request(std::get(conn), command, std::move(reply_cb), + send_option::data_parts(data), hint, optional, incoming, outgoing, keep_alive, request_timeout, std::move(qfail), std::move(qfull)); } @@ -910,12 +921,13 @@ OxenMQ requests so that both are treated fairly in terms of processing priority. auto result = std::make_shared>(); auto fut = result->get_future(); - auto on_reply = [result](py::list value) { + std::function on_reply = [result](py::list value) { assert(len(value) == 0 || py::isinstance(value[0])); for (int i = len(value) - 1; i >= 0; i--) value[i] = value[i].attr("tobytes")(); + result->set_value(std::move(value)); }; - auto on_fail = [result](py::list value) { + std::function on_fail = [result](py::list value) { if (len(value) > 0 && (std::string) py::bytes(value[0]) == "TIMEOUT"sv) { auto msg = len(value) > 1 ? (std::string) py::bytes(value[1]) : "Request timed out"s; PyErr_SetString(PyExc_TimeoutError, msg.c_str()); From 171e94cf1105df252770fef763aeed1136499a71 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 16:28:53 -0300 Subject: [PATCH 07/11] Add simple pytest tests --- pytest.ini | 3 + tests/test_messages.py | 126 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 pytest.ini create mode 100644 tests/test_messages.py diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..e618d7a --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = + tests diff --git a/tests/test_messages.py b/tests/test_messages.py new file mode 100644 index 0000000..34bc40c --- /dev/null +++ b/tests/test_messages.py @@ -0,0 +1,126 @@ + +from oxenmq import OxenMQ, Message, LogLevel, AuthLevel, Address +import random +import string +from datetime import datetime, timedelta +import time +import pytest + + +def echo(m: Message): + return 'Hi!', m.data() + + +def ohce(m: Message): + return [bytes(reversed(x)) for x in reversed(m.data())], '!iH' + + +@pytest.fixture(autouse=True) +def zmq_address(): + import os + sock = './' + ''.join(random.choices(string.ascii_letters, k=20)) + '.sock' + addr = 'ipc://' + sock + yield addr + os.remove(sock) + + +def make_omqs(zmq_addr, start=True): + omq1 = OxenMQ(log_level=LogLevel.trace) + + addr = Address(zmq_addr, omq1.pubkey) + + omq1.listen(addr.zmq_address, curve=True) + + omq1.add_category('cat', AuthLevel.none) \ + .add_request_command('echo', echo) \ + .add_request_command('ohce', ohce) + + omq2 = OxenMQ(log_level=LogLevel.trace) + + if start: + omq1.start() + omq2.start() + + return omq1, omq2, addr + + +def test_requests(zmq_address): + omq1, omq2, addr = make_omqs(zmq_address) + + reply1, reply2 = None, None + + def on_reply1(r): + nonlocal reply1 + reply1 = [x.tobytes().decode() for x in r] + + def on_reply2(r): + nonlocal reply2 + reply2 = [x.tobytes().decode() for x in r] + + c1 = omq2.connect_remote(addr) + omq2.request(c1, 'cat.echo', 'fee', 'fi', 'fo', 'fum', on_reply=on_reply1) + omq2.request(c1, 'cat.ohce', 'fee', 'fi', 'fo', 'fum', on_reply=on_reply2) + + timeout = datetime.now() + timedelta(seconds=0.5) + while None in (reply1, reply2) and datetime.now() < timeout: + time.sleep(0.01) + + assert reply1 == ['Hi!', 'fee', 'fi', 'fo', 'fum'] + assert reply2 == ['muf', 'of', 'if', 'eef', '!iH'] + + +def test_request_future(zmq_address): + omq1, omq2, addr = make_omqs(zmq_address) + c1 = omq2.connect_remote(addr) + + reply3 = [x.decode() for x in omq2.request_future(c1, 'cat.echo', 'xyz').get()] + assert reply3 == ['Hi!', 'xyz'] + + +def test_commands(zmq_address): + omq1, omq2, addr = make_omqs(zmq_address, start=False) + + val1, val2, val3 = None, None, None + defer = None + + def cmd1(m): + nonlocal val1 + val1 = ['CMD1 got'] + m.data() + m.back("x.x", "asdf", b'\x00\x01\x02', "jkl;") + + def cmd2(m): + nonlocal val2, defer + val2 = ['CMD2 got'] + m.data() + defer = m.later() + + def cmd_later(m): + nonlocal defer, val3 + val3 = ['CMD-later got'] + m.data() + + omq1.add_category("x", AuthLevel.none) \ + .add_command("y", cmd1) \ + .add_command("z", cmd_later) + + omq2.add_category("x", AuthLevel.none) \ + .add_command("x", cmd2) + + omq1.start() + omq2.start() + c1 = omq2.connect_remote(addr) + omq2.send(c1, 'x.y', b'\0', 'M', 'G') + + timeout = datetime.now() + timedelta(seconds=0.5) + while None in (val1, val2) and datetime.now() < timeout: + time.sleep(0.01) + + assert val1 == ['CMD1 got', b'\0', b'M', b'G'] + assert val2 == ['CMD2 got', b'asdf', b'\0\1\2', b'jkl;'] + assert val3 is None + assert defer is not None + + defer.back("x.z", "cool") + timeout = datetime.now() + timedelta(seconds=0.5) + while val3 is None and datetime.now() < timeout: + time.sleep(0.01) + assert val3 == ['CMD-later got', b'cool'] + From 996370228c46c938199d1070f3b98f537d6bc027 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 16:40:14 -0300 Subject: [PATCH 08/11] Add drone build & tests --- .drone.jsonnet | 97 ++++++++++++++++++++++++++++++++++++++++ contrib/deb.oxen.io.gpg | Bin 0 -> 2213 bytes 2 files changed, 97 insertions(+) create mode 100644 .drone.jsonnet create mode 100644 contrib/deb.oxen.io.gpg diff --git a/.drone.jsonnet b/.drone.jsonnet new file mode 100644 index 0000000..2a2774c --- /dev/null +++ b/.drone.jsonnet @@ -0,0 +1,97 @@ +local default_deps_base = [ + 'python3-dev', + 'python3-setuptools', + 'pybind11-dev', + 'python3-pybind11', + 'liboxenmq-dev', + 'python3-pytest', + 'python3-pip', +]; +local default_deps = ['g++'] + default_deps_base; +local docker_base = 'registry.oxen.rocks/lokinet-ci-'; + +local apt_get_quiet = 'apt-get -o=Dpkg::Use-Pty=0 -q'; + +// Regular build on a debian-like system: +local debian_pipeline(name, + image, + arch='amd64', + deps=default_deps, + extra_cmds=[], + jobs=6, + loki_repo=true, + allow_fail=false) = { + kind: 'pipeline', + type: 'docker', + name: name, + platform: { arch: arch }, + steps: [ + { + name: 'build', + image: image, + pull: 'always', + [if allow_fail then 'failure']: 'ignore', + environment: { SSH_KEY: { from_secret: 'SSH_KEY' } }, + commands: [ + 'echo "Building on ${DRONE_STAGE_MACHINE}"', + 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections', + apt_get_quiet + ' update', + apt_get_quiet + ' install -y eatmydata', + ] + ( + if loki_repo then [ + 'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y lsb-release', + 'cp contrib/deb.oxen.io.gpg /etc/apt/trusted.gpg.d', + 'echo deb http://deb.oxen.io $$(lsb_release -sc) main >/etc/apt/sources.list.d/oxen.list', + 'echo deb http://deb.oxen.io/beta $$(lsb_release -sc) main >>/etc/apt/sources.list.d/oxen.list', + 'echo deb http://deb.oxen.io/staging $$(lsb_release -sc) main >>/etc/apt/sources.list.d/oxen.list', + 'eatmydata ' + apt_get_quiet + ' update', + ] else [] + ) + [ + 'eatmydata ' + apt_get_quiet + ' dist-upgrade -y', + 'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y ' + std.join(' ', deps), + 'pip3 install . -v', + 'py.test-3 -v --color=yes', + ] + + extra_cmds, + }, + ], +}; + +// Macos build +local mac_builder(name, + extra_cmds=[], + jobs=6, + allow_fail=false) = { + kind: 'pipeline', + type: 'exec', + name: name, + platform: { os: 'darwin', arch: 'amd64' }, + steps: [ + { + name: 'build', + environment: { SSH_KEY: { from_secret: 'SSH_KEY' } }, + commands: [ + 'echo "Building on ${DRONE_STAGE_MACHINE}"', + 'python3 -mpip install . -v', + 'python3 -mpytest -v --color=yes', + ] + extra_cmds, + }, + ], +}; + + +[ + // Various debian builds + debian_pipeline('Debian sid (amd64)', docker_base + 'debian-sid'), + debian_pipeline('Debian stable (i386)', docker_base + 'debian-stable/i386'), + debian_pipeline('Debian buster (amd64)', docker_base + 'debian-buster'), + debian_pipeline('Ubuntu latest (amd64)', docker_base + 'ubuntu-rolling'), + debian_pipeline('Ubuntu LTS (amd64)', docker_base + 'ubuntu-lts'), + + // ARM builds (ARM64 and armhf) + debian_pipeline('Debian sid (ARM64)', docker_base + 'debian-sid', arch='arm64', jobs=4), + debian_pipeline('Debian stable (armhf)', docker_base + 'debian-stable/arm32v7', arch='arm64', jobs=4), + + // Macos builds: + mac_builder('macOS'), +] diff --git a/contrib/deb.oxen.io.gpg b/contrib/deb.oxen.io.gpg new file mode 100644 index 0000000000000000000000000000000000000000..6ec4e25ad9a5f27c4cf558a455888e8087153f9d GIT binary patch literal 2213 zcmV;W2wL}<0u2OKF+Xtu5CE?&fm>>)yi@QSe>CUvyh@(yR~rMoHPnm@yQ)}lpqkMf zD?8<&y2iLh@H(Ncc^4S;)>v2`VyM%K48uy?Hm4i`cW^n<9xI`P`k|xhmc>GxhsBVaU3%-K%Fsi@4anlD}?+XzVZtkUWlK9Oy%mk zK7NJ7?iE9>EM#<>R_NBkXI4vd5`1Gj8c8HH-FT86?#Gj^%?08cQf1IR5jG%#ZXgKD zg+Wm2dD1nIZw1&kNqF&k4y1w3T?Z!kOVBFE$wZE|I{8`aD5(CY7o+UJ`tv_;u9n%H zLS2uAMJ~0yKh)T8i(NpiJ{GDQvMlARUq%>H-AVyWlL=<0++mOSL2=r#=q5_e&WBf; zo&cKEaSF{7hOIr}DX^5LQn;H*IyF_fD_aFKZ_sf)Ym5_koQX6yb{5@~!lTDQ<^?VT zVo*=fO@xy|MABwa>{N&SlGJ~huxV7Pn_x{@UMePG_`syb`&$w4v&h{cr;FlK-#A>D z*0tfCuxf-a)w(5@t{0-vrJ;fjb;tglI<`T{ID6>1juEPmvTExc^@koU#T15Y}5 z3~z8-&kGw-w@s&0VZR?_NNKBxqMH3i+@}mt`|88~?KFm>Tjd5%y!(vW1aJT{p6v=P zP@;vjY{;5gEvo<#0RRECCQ4y*Z*Cw`XlZU`Y+-I>WpW@qYGHG4Za`^mVP|P>VRCsc zV_`mt0yqQ{0RjLb1p-zvKXC#Z0|pBT2nPcK1{DYb2?`4Y76JnS0v-VZ7k~f?2@u4Y zEa#|=LSko35B@W1W>tT)7-CsDjswdHnH&BedhVRfMKUq~nh`E@@ORw+{kDqoYyN@! zv#Q3XiR0yopGKib1^oxCZLVf5L2Jq-{BS@h4qAgo`gIU5u z3nOqmW;@|rm3j*7wg^ow8P%~v?nE8%c^gm^Se&vRS zn(HdcusYW?Xj|3iST3E%P`n#rK%=t}*OxHHHf*%LoP5m&xX^C(4{MgfP?wTHwCG!N z3N*Q6#p){(kK;L73f`Pm)tT*AB8@_2V$E(!Y%5vb>gFK_%xpKpx9z*RjSaMn0jW~ z4*AeBu^e5Z`LSf*3t&sUbXXHNS<|H3XZ!00r*GIMm(r5I)ucUp*m^EQ)<`#6 zE*&R?6q%5idYuu)WK~YtKAJw4i^hWl;L=n1HDh*b1>Z#(?DPEacNHqunS2=ZD`e8299CYx~rbxm&e9Ypjw&6RWWo0xevuI#5Y4v zg7rl1xZsxECMmVcJ{a(f$$%5b5@jJ`FT0Rjmm3bHoe{=}WmJkT(L zTdoAcYT@XJCaEd}4~t+Fp6OY9zmyjI1E>1i8xX?$ERz{#`AKNn09 zn|>{TqRl@|vyy;v?sno*#LE+q$cNQ%re3_6a~0olmz)^0IlkdT?ZJ$NF9MCa9Zq31 z0CWd+&oxI*h_%@gf;#r)aV=3aMq-o$#iYHG1&XdPa>MBAQ>$C2IZ89ZPRy~}oNlIEZ)O@& zTAQzFr0$^a2iHSM2)P#AGGz4gglF!^bwg4<5h4{O2DQ>GoDE!hzSQFIM(+u+{Yd12 zQ9#higkbklZ`(i{z`>sATHHV8DP_vp+Ay4#6dr=F+BM|@FPRe4jyZi_EWv@^(kR-O z|CSgF*+&*w@C?x?uU3p7^im$a6U$0ZvQi0%{|rtN!nDA|ok|BysaU%9aEB5@BBiDq nq1%?36-cF__u)W3ywPi5!&0SfcP(HNlEj8=N;Vhp-76(J2Bs0L literal 0 HcmV?d00001 From 97238b04ba79647f84dbaf0cae474771ba7d590e Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 18:33:35 -0300 Subject: [PATCH 09/11] Hacking around for macos Build local oxenmq Install everything into a local prefix --- .drone.jsonnet | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/.drone.jsonnet b/.drone.jsonnet index 2a2774c..72882cb 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -72,8 +72,18 @@ local mac_builder(name, environment: { SSH_KEY: { from_secret: 'SSH_KEY' } }, commands: [ 'echo "Building on ${DRONE_STAGE_MACHINE}"', - 'python3 -mpip install . -v', - 'python3 -mpytest -v --color=yes', + 'mkdir prefix', + 'git clone https://github.com/oxen-io/oxen-mq.git', + 'cd oxen-mq', + 'git submodule update --init --recursive --depth=1', + 'mkdir build && cd build', + 'cmake .. -DOXENMQ_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=../../prefix -G Ninja', + 'ninja install -j' + jobs, + 'cd ../..', + 'LDFLAGS="-L$$PWD/prefix/lib -L/opt/local/lib" ' + + 'CFLAGS="-Wextra -Werror -fcolor-diagnostics -I$$PWD/prefix/include -I/opt/local/include" ' + + 'python3 -mpip install . -v --prefix=./prefix', + 'DYLD_LIBRARY_PATH=$$PWD/prefix/lib PYTHONPATH=$$(ls -1d $$PWD/prefix/lib/python3*/site-packages) python3 -mpytest -v --color=yes', ] + extra_cmds, }, ], From 6d59248f450338286bcb2a27110c471089302496 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 18:35:18 -0300 Subject: [PATCH 10/11] Build with -Wextra -Werror and color --- .drone.jsonnet | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.drone.jsonnet b/.drone.jsonnet index 72882cb..75e29a0 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -49,7 +49,7 @@ local debian_pipeline(name, ) + [ 'eatmydata ' + apt_get_quiet + ' dist-upgrade -y', 'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y ' + std.join(' ', deps), - 'pip3 install . -v', + 'CFLAGS="-Wextra -Werror -fdiagnostics-color" pip3 install . -v', 'py.test-3 -v --color=yes', ] + extra_cmds, From b12c754895218a24dc6e2bea78bdc9f836e5075a Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 18:44:20 -0300 Subject: [PATCH 11/11] Fixes and address warnings - timeout in the syncronous connect_remote wasn't being passed through - callback and syncronous connect_remote had some deviation in args; unified them to both take keyword-only timeout and ephemeral_routing_id, and to take timeout as a timedelta with a default (rather than an optional in the callback-based one). - removed unused "noopt". I forget what it was for, but it didn't end up being used. --- src/oxenmq.cpp | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/oxenmq.cpp b/src/oxenmq.cpp index 0e21e2c..1bd700d 100644 --- a/src/oxenmq.cpp +++ b/src/oxenmq.cpp @@ -51,8 +51,6 @@ struct stderr_logger { } }; -constexpr auto noopt = [] {}; - void OxenMQ_Init(py::module& mod) { @@ -606,17 +604,17 @@ permissions: in this example, the required permissions the access the endpoint w const address& remote, OxenMQ::ConnectSuccess on_success, OxenMQ::ConnectFailure on_failure, - std::optional timeout, + std::chrono::milliseconds timeout, std::optional ephemeral_routing_id) { return self.connect_remote(remote, std::move(on_success), std::move(on_failure), - connect_option::timeout{timeout.value_or(oxenmq::REMOTE_CONNECT_TIMEOUT)}, + connect_option::timeout{timeout}, connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)} ); }, "remote"_a, "on_success"_a, "on_failure"_a, kwonly, - "timeout"_a = std::nullopt, "ephemeral_routing_id"_a = std::nullopt, + "timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, "ephemeral_routing_id"_a = std::nullopt, R"( Starts connecting to a remote address and return immediately. The connection can be used immediately, however messages will only be queued until the connection is established (or dropped if @@ -625,7 +623,10 @@ the connection fails). The given callbacks are invoked for success or failure. `ephemeral_routing_id` and `timeout` allowing overriding the defaults (oxenmq.EPHEMERAL_ROUTING_ID and 10s, respectively). )") - .def("connect_remote", [](OxenMQ& self, const address& remote, std::chrono::milliseconds timeout) { + .def("connect_remote", [](OxenMQ& self, + const address& remote, + std::chrono::milliseconds timeout, + std::optional ephemeral_routing_id) { std::promise promise; self.connect_remote( remote, @@ -633,9 +634,12 @@ and 10s, respectively). [&promise](auto, std::string_view reason) { promise.set_exception(std::make_exception_ptr( std::runtime_error{"Connection failed: " + std::string{reason}})); - }); + }, + oxenmq::connect_option::timeout{timeout}, + connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)} + ); return promise.get_future().get(); - }, "remote"_a, "timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, + }, "remote"_a, "timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, "ephemeral_routing_id"_a = std::nullopt, R"(Simpler version of connect_remote that connects to a remote address synchronously. This will block until the connection is established or times out; throws on connection failure,