commit
06e0e2eb44
|
@ -0,0 +1,107 @@
|
|||
local default_deps_base = [
|
||||
'python3-dev',
|
||||
'python3-setuptools',
|
||||
'pybind11-dev',
|
||||
'python3-pybind11',
|
||||
'liboxenmq-dev',
|
||||
'python3-pytest',
|
||||
'python3-pip',
|
||||
];
|
||||
local default_deps = ['g++'] + default_deps_base;
|
||||
local docker_base = 'registry.oxen.rocks/lokinet-ci-';
|
||||
|
||||
local apt_get_quiet = 'apt-get -o=Dpkg::Use-Pty=0 -q';
|
||||
|
||||
// Regular build on a debian-like system:
|
||||
local debian_pipeline(name,
|
||||
image,
|
||||
arch='amd64',
|
||||
deps=default_deps,
|
||||
extra_cmds=[],
|
||||
jobs=6,
|
||||
loki_repo=true,
|
||||
allow_fail=false) = {
|
||||
kind: 'pipeline',
|
||||
type: 'docker',
|
||||
name: name,
|
||||
platform: { arch: arch },
|
||||
steps: [
|
||||
{
|
||||
name: 'build',
|
||||
image: image,
|
||||
pull: 'always',
|
||||
[if allow_fail then 'failure']: 'ignore',
|
||||
environment: { SSH_KEY: { from_secret: 'SSH_KEY' } },
|
||||
commands: [
|
||||
'echo "Building on ${DRONE_STAGE_MACHINE}"',
|
||||
'echo "man-db man-db/auto-update boolean false" | debconf-set-selections',
|
||||
apt_get_quiet + ' update',
|
||||
apt_get_quiet + ' install -y eatmydata',
|
||||
] + (
|
||||
if loki_repo then [
|
||||
'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y lsb-release',
|
||||
'cp contrib/deb.oxen.io.gpg /etc/apt/trusted.gpg.d',
|
||||
'echo deb http://deb.oxen.io $$(lsb_release -sc) main >/etc/apt/sources.list.d/oxen.list',
|
||||
'echo deb http://deb.oxen.io/beta $$(lsb_release -sc) main >>/etc/apt/sources.list.d/oxen.list',
|
||||
'echo deb http://deb.oxen.io/staging $$(lsb_release -sc) main >>/etc/apt/sources.list.d/oxen.list',
|
||||
'eatmydata ' + apt_get_quiet + ' update',
|
||||
] else []
|
||||
) + [
|
||||
'eatmydata ' + apt_get_quiet + ' dist-upgrade -y',
|
||||
'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y ' + std.join(' ', deps),
|
||||
'CFLAGS="-Wextra -Werror -fdiagnostics-color" pip3 install . -v',
|
||||
'py.test-3 -v --color=yes',
|
||||
]
|
||||
+ extra_cmds,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
// Macos build
|
||||
local mac_builder(name,
|
||||
extra_cmds=[],
|
||||
jobs=6,
|
||||
allow_fail=false) = {
|
||||
kind: 'pipeline',
|
||||
type: 'exec',
|
||||
name: name,
|
||||
platform: { os: 'darwin', arch: 'amd64' },
|
||||
steps: [
|
||||
{
|
||||
name: 'build',
|
||||
environment: { SSH_KEY: { from_secret: 'SSH_KEY' } },
|
||||
commands: [
|
||||
'echo "Building on ${DRONE_STAGE_MACHINE}"',
|
||||
'mkdir prefix',
|
||||
'git clone https://github.com/oxen-io/oxen-mq.git',
|
||||
'cd oxen-mq',
|
||||
'git submodule update --init --recursive --depth=1',
|
||||
'mkdir build && cd build',
|
||||
'cmake .. -DOXENMQ_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=../../prefix -G Ninja',
|
||||
'ninja install -j' + jobs,
|
||||
'cd ../..',
|
||||
'LDFLAGS="-L$$PWD/prefix/lib -L/opt/local/lib" ' +
|
||||
'CFLAGS="-Wextra -Werror -fcolor-diagnostics -I$$PWD/prefix/include -I/opt/local/include" ' +
|
||||
'python3 -mpip install . -v --prefix=./prefix',
|
||||
'DYLD_LIBRARY_PATH=$$PWD/prefix/lib PYTHONPATH=$$(ls -1d $$PWD/prefix/lib/python3*/site-packages) python3 -mpytest -v --color=yes',
|
||||
] + extra_cmds,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
|
||||
[
|
||||
// Various debian builds
|
||||
debian_pipeline('Debian sid (amd64)', docker_base + 'debian-sid'),
|
||||
debian_pipeline('Debian stable (i386)', docker_base + 'debian-stable/i386'),
|
||||
debian_pipeline('Debian buster (amd64)', docker_base + 'debian-buster'),
|
||||
debian_pipeline('Ubuntu latest (amd64)', docker_base + 'ubuntu-rolling'),
|
||||
debian_pipeline('Ubuntu LTS (amd64)', docker_base + 'ubuntu-lts'),
|
||||
|
||||
// ARM builds (ARM64 and armhf)
|
||||
debian_pipeline('Debian sid (ARM64)', docker_base + 'debian-sid', arch='arm64', jobs=4),
|
||||
debian_pipeline('Debian stable (armhf)', docker_base + 'debian-stable/arm32v7', arch='arm64', jobs=4),
|
||||
|
||||
// Macos builds:
|
||||
mac_builder('macOS'),
|
||||
]
|
|
@ -1,8 +0,0 @@
|
|||
[submodule "external/pybind11"]
|
||||
path = external/pybind11
|
||||
url = https://github.com/pybind/pybind11
|
||||
branch = stable
|
||||
[submodule "external/oxen-mq"]
|
||||
path = external/oxen-mq
|
||||
url = https://github.com/oxen-io/oxen-mq
|
||||
branch = dev
|
|
@ -1,55 +0,0 @@
|
|||
cmake_minimum_required(VERSION 3.10) # bionic's cmake version
|
||||
|
||||
# Has to be set before `project()`, and ignored on non-macos:
|
||||
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.13 CACHE STRING "macOS deployment target (Apple clang only)")
|
||||
|
||||
find_program(CCACHE_PROGRAM ccache)
|
||||
if(CCACHE_PROGRAM)
|
||||
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CCACHE_PROGRAM}")
|
||||
endif()
|
||||
|
||||
set(PROJECT_NAME pyoxenmq)
|
||||
project(${PROJECT_NAME}
|
||||
VERSION 0.1.0
|
||||
DESCRIPTION "pybind layer for oxenmq"
|
||||
LANGUAGES CXX)
|
||||
|
||||
|
||||
if(NOT CMAKE_BUILD_TYPE)
|
||||
set(CMAKE_BUILD_TYPE RelWithDebInfo)
|
||||
endif()
|
||||
|
||||
include(CheckCXXSourceCompiles)
|
||||
include(CheckLibraryExists)
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
||||
set(CMAKE_CXX_EXTENSIONS OFF)
|
||||
|
||||
option(SUBMODULE_CHECK "Enables checking that vendored library submodules are up to date" ON)
|
||||
if(SUBMODULE_CHECK)
|
||||
find_package(Git)
|
||||
if(GIT_FOUND)
|
||||
function(check_submodule relative_path)
|
||||
execute_process(COMMAND git rev-parse "HEAD" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${relative_path} OUTPUT_VARIABLE localHead)
|
||||
execute_process(COMMAND git rev-parse "HEAD:${relative_path}" WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE checkedHead)
|
||||
string(COMPARE EQUAL "${localHead}" "${checkedHead}" upToDate)
|
||||
if (upToDate)
|
||||
message(STATUS "Submodule '${relative_path}' is up-to-date")
|
||||
else()
|
||||
message(FATAL_ERROR "Submodule '${relative_path}' is not up-to-date. Please update with\ngit submodule update --init --recursive\nor run cmake with -DSUBMODULE_CHECK=OFF")
|
||||
endif()
|
||||
endfunction ()
|
||||
|
||||
check_submodule(external/pybind11)
|
||||
check_submodule(external/oxen-mq)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
add_compile_options(-Wno-deprecated-declarations)
|
||||
|
||||
add_subdirectory(external/pybind11)
|
||||
|
||||
include(FindPkgConfig)
|
||||
pkg_check_modules(oxenmq REQUIRED IMPORTED_TARGET GLOBAL liboxenmq)
|
||||
|
||||
add_subdirectory(pyoxenmq)
|
|
@ -1,4 +1,4 @@
|
|||
include README.md LICENSE
|
||||
global-include CMakeLists.txt *.cmake
|
||||
recursive-include pyoxenmq *
|
||||
recursive-include src *
|
||||
recursive-include pybind11/include *.h
|
||||
|
|
Binary file not shown.
|
@ -1 +0,0 @@
|
|||
Subproject commit dccbd1e8cdb9f57206077facdf973c8e86fc6bec
|
|
@ -1 +0,0 @@
|
|||
Subproject commit 3b1dbebabc801c9cf6f0953a4c20b904d444f879
|
|
@ -1,10 +0,0 @@
|
|||
|
||||
pybind11_add_module(pyoxenmq MODULE
|
||||
bencode.cpp
|
||||
oxenmq.cpp
|
||||
module.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(pyoxenmq PUBLIC PkgConfig::oxenmq)
|
||||
|
||||
|
|
@ -1,16 +0,0 @@
|
|||
#include "common.hpp"
|
||||
#include "oxenmq/base32z.h"
|
||||
|
||||
namespace oxenmq
|
||||
{
|
||||
void
|
||||
BEncode_Init(py::module & mod)
|
||||
{
|
||||
mod.def("base32z_encode", [](py::bytes data) {
|
||||
char * ptr = nullptr;
|
||||
py::ssize_t sz = 0;
|
||||
PyBytes_AsStringAndSize(data.ptr(), &ptr, &sz);
|
||||
return oxenmq::to_base32z(ptr, ptr+sz);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
#include "common.hpp"
|
||||
|
||||
PYBIND11_MODULE(pyoxenmq, m)
|
||||
{
|
||||
oxenmq::OxenMQ_Init(m);
|
||||
oxenmq::BEncode_Init(m);
|
||||
}
|
|
@ -1,224 +0,0 @@
|
|||
#include "common.hpp"
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
#include <oxenmq/oxenmq.h>
|
||||
#include <oxenmq/address.h>
|
||||
#include <pybind11/chrono.h>
|
||||
#include <pybind11/functional.h>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
|
||||
namespace oxenmq
|
||||
{
|
||||
template <typename... Options>
|
||||
std::future<std::vector<std::string>> MQ_start_request(
|
||||
OxenMQ& omq,
|
||||
ConnectionID conn,
|
||||
std::string name,
|
||||
std::vector<py::bytes> byte_args,
|
||||
Options&&... opts)
|
||||
{
|
||||
std::vector<std::string> args;
|
||||
args.reserve(byte_args.size());
|
||||
for (auto& b : byte_args)
|
||||
args.push_back(b);
|
||||
|
||||
auto result = std::make_shared<std::promise<std::vector<std::string>>>();
|
||||
auto fut = result->get_future();
|
||||
omq.request(conn, std::move(name),
|
||||
[result=std::move(result)](bool success, std::vector<std::string> value)
|
||||
{
|
||||
if (success)
|
||||
result->set_value(std::move(value));
|
||||
else
|
||||
{
|
||||
std::string err;
|
||||
for (auto& m : value) {
|
||||
if (!err.empty()) err += ", ";
|
||||
err += m;
|
||||
}
|
||||
result->set_exception(std::make_exception_ptr(std::runtime_error{"Request failed: " + err}));
|
||||
}
|
||||
},
|
||||
oxenmq::send_option::data_parts(args.begin(), args.end()),
|
||||
std::forward<Options>(opts)...
|
||||
);
|
||||
return fut;
|
||||
}
|
||||
|
||||
// Binds a stl future. `Conv` is a lambda that converts the future's .get() value into something
|
||||
// Python-y (it can be the value directly, if the value is convertible to Python already).
|
||||
template <typename F, typename Conv>
|
||||
void bind_future(py::module& m, std::string class_name, Conv conv)
|
||||
{
|
||||
py::class_<F>(m, class_name.c_str())
|
||||
.def("get", [conv=std::move(conv)](F& f) { return conv(f.get()); },
|
||||
"Gets the result (or raises an exception if the result set an exception); must only be called once")
|
||||
.def("valid", [](F& f) { return f.valid(); },
|
||||
"Returns true if the result is available")
|
||||
.def("wait", &F::wait,
|
||||
"Waits indefinitely for the result to become available")
|
||||
.def("wait_for", &F::template wait_for<double, std::ratio<1>>,
|
||||
"Waits up to the given timedelta for the result to become available")
|
||||
.def("wait_for", [](F& f, double seconds) { return f.wait_for(std::chrono::duration<double>{seconds}); },
|
||||
"Waits up to the given number of seconds for the result to become available")
|
||||
.def("wait_until", &F::template wait_until<std::chrono::system_clock, std::chrono::system_clock::duration>,
|
||||
"Wait until the given datetime for the result to become available")
|
||||
;
|
||||
}
|
||||
|
||||
static std::mutex log_mutex;
|
||||
|
||||
void
|
||||
OxenMQ_Init(py::module & mod)
|
||||
{
|
||||
using namespace pybind11::literals;
|
||||
py::class_<ConnectionID>(mod, "ConnectionID")
|
||||
.def("__eq__", [](const ConnectionID & self, const ConnectionID & other) {
|
||||
return self == other;
|
||||
});
|
||||
py::class_<Message>(mod, "Message")
|
||||
.def_readonly("remote", &Message::remote)
|
||||
.def_readonly("conn", &Message::conn);
|
||||
|
||||
py::class_<address>(mod, "Address")
|
||||
.def(py::init<std::string>());
|
||||
py::class_<TaggedThreadID>(mod, "TaggedThreadID");
|
||||
py::enum_<LogLevel>(mod, "LogLevel")
|
||||
.value("fatal", LogLevel::fatal).value("error", LogLevel::error).value("warn", LogLevel::warn)
|
||||
.value("info", LogLevel::info).value("debug", LogLevel::debug).value("trace", LogLevel::trace);
|
||||
|
||||
py::enum_<std::future_status>(mod, "future_status")
|
||||
.value("deferred", std::future_status::deferred)
|
||||
.value("ready", std::future_status::ready)
|
||||
.value("timeout", std::future_status::timeout);
|
||||
bind_future<std::future<std::vector<std::string>>>(mod, "ResultFuture",
|
||||
[](std::vector<std::string> bytes) {
|
||||
py::list l;
|
||||
for (const auto& v : bytes)
|
||||
l.append(py::bytes(v));
|
||||
return l;
|
||||
});
|
||||
|
||||
py::class_<OxenMQ>(mod, "OxenMQ")
|
||||
.def(py::init<>())
|
||||
.def(py::init([](LogLevel level) {
|
||||
// Quick and dirty logger that logs to stderr. It would be much nicer to take a python
|
||||
// function, but that deadlocks pretty much right away because of the crappiness of the gil.
|
||||
return std::make_unique<OxenMQ>([] (LogLevel lvl, const char* file, int line, std::string msg) mutable {
|
||||
std::lock_guard l{log_mutex};
|
||||
std::cerr << '[' << lvl << "][" << file << ':' << line << "]: " << msg << "\n";
|
||||
}, level);
|
||||
}))
|
||||
.def_readwrite("handshake_time", &OxenMQ::HANDSHAKE_TIME)
|
||||
.def_readwrite("ephemeral_routing_id", &OxenMQ::EPHEMERAL_ROUTING_ID)
|
||||
.def_readwrite("max_message_size", &OxenMQ::MAX_MSG_SIZE)
|
||||
.def_readwrite("max_sockets", &OxenMQ::MAX_SOCKETS)
|
||||
.def_readwrite("reconnect_interval", &OxenMQ::RECONNECT_INTERVAL)
|
||||
.def_readwrite("close_longer", &OxenMQ::CLOSE_LINGER)
|
||||
.def_readwrite("connection_check_interval", &OxenMQ::CONN_CHECK_INTERVAL)
|
||||
.def_readwrite("connection_heartbeat", &OxenMQ::CONN_HEARTBEAT)
|
||||
.def_readwrite("connection_heartbeat_timeout", &OxenMQ::CONN_HEARTBEAT_TIMEOUT)
|
||||
.def_readwrite("startup_umask", &OxenMQ::STARTUP_UMASK)
|
||||
.def("start", &OxenMQ::start)
|
||||
.def("listen_plain",
|
||||
[](OxenMQ & self, std::string path) {
|
||||
self.listen_plain(path);
|
||||
})
|
||||
.def("listen_curve",
|
||||
[](OxenMQ & self, std::string path) {
|
||||
self.listen_curve(path);
|
||||
})
|
||||
.def("add_tagged_thread",
|
||||
[](OxenMQ & self, std::string name) {
|
||||
return self.add_tagged_thread(name);
|
||||
})
|
||||
.def("add_timer",
|
||||
[](OxenMQ & self, std::chrono::milliseconds interval, std::function<void(void)> callback) {
|
||||
self.add_timer(callback, interval);
|
||||
})
|
||||
.def("call_soon",
|
||||
[](OxenMQ & self, std::function<void(void)> job, std::optional<TaggedThreadID> thread)
|
||||
{
|
||||
self.job(std::move(job), std::move(thread));
|
||||
})
|
||||
.def("add_anonymous_category",
|
||||
[](OxenMQ & self, std::string name)
|
||||
{
|
||||
self.add_category(std::move(name), AuthLevel::none);
|
||||
})
|
||||
.def("add_request_command",
|
||||
[](OxenMQ &self,
|
||||
std::string category,
|
||||
std::string name,
|
||||
py::function handler)
|
||||
{
|
||||
self.add_request_command(category, name,
|
||||
[handler](Message & msg) {
|
||||
std::string result;
|
||||
{
|
||||
py::gil_scoped_acquire gil;
|
||||
|
||||
std::vector<py::bytes> data;
|
||||
for (auto& arg : msg.data)
|
||||
{
|
||||
data.emplace_back(arg.begin(), arg.size());
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
const auto obj = handler(data);
|
||||
result = py::str(obj);
|
||||
}
|
||||
catch(std::exception & ex)
|
||||
{
|
||||
PyErr_SetString(PyExc_RuntimeError, ex.what());
|
||||
}
|
||||
}
|
||||
msg.send_reply(result);
|
||||
});
|
||||
})
|
||||
.def("connect_remote",
|
||||
[](OxenMQ & self,
|
||||
std::string remote) -> ConnectionID
|
||||
{
|
||||
std::promise<ConnectionID> promise;
|
||||
self.connect_remote(
|
||||
remote,
|
||||
[&promise](ConnectionID id) { promise.set_value(std::move(id)); },
|
||||
[&promise](auto, std::string_view reason) {
|
||||
promise.set_exception(std::make_exception_ptr(
|
||||
std::runtime_error{"Connection failed: " + std::string{reason}}));
|
||||
});
|
||||
return promise.get_future().get();
|
||||
})
|
||||
.def("request",
|
||||
[](OxenMQ & self,
|
||||
ConnectionID conn,
|
||||
std::string name,
|
||||
std::vector<py::bytes> args,
|
||||
std::optional<double> timeout) -> py::list
|
||||
{
|
||||
py::list l;
|
||||
for (auto& s : MQ_start_request(self, conn, std::move(name), std::move(args),
|
||||
oxenmq::send_option::request_timeout{timeout ? std::chrono::milliseconds(long(*timeout * 1000)) : DEFAULT_REQUEST_TIMEOUT}
|
||||
).get())
|
||||
l.append(py::bytes(s));
|
||||
return l;
|
||||
},
|
||||
"conn"_a, "name"_a, "args"_a = std::vector<py::bytes>{}, "timeout"_a = py::none{})
|
||||
.def("request_future",
|
||||
[](OxenMQ & self,
|
||||
ConnectionID conn,
|
||||
std::string name,
|
||||
std::vector<py::bytes> args,
|
||||
std::optional<double> timeout) -> std::future<std::vector<std::string>>
|
||||
{
|
||||
return MQ_start_request(self, conn, std::move(name), std::move(args),
|
||||
oxenmq::send_option::request_timeout{timeout ? std::chrono::milliseconds(long(*timeout * 1000)) : DEFAULT_REQUEST_TIMEOUT}
|
||||
);
|
||||
},
|
||||
"conn"_a, "name"_a, "args"_a = std::vector<py::bytes>{}, "timeout"_a = py::none{})
|
||||
;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
[build-system]
|
||||
requires = [
|
||||
"setuptools>=42",
|
||||
"wheel",
|
||||
"pybind11>=2.6.0",
|
||||
]
|
||||
|
||||
build-backend = "setuptools.build_meta"
|
|
@ -0,0 +1,3 @@
|
|||
[pytest]
|
||||
testpaths =
|
||||
tests
|
88
setup.py
88
setup.py
|
@ -1,74 +1,30 @@
|
|||
import os
|
||||
import re
|
||||
import sys
|
||||
import platform
|
||||
import subprocess
|
||||
from setuptools import setup
|
||||
|
||||
from setuptools import setup, Extension
|
||||
from setuptools.command.build_ext import build_ext
|
||||
from distutils.version import LooseVersion
|
||||
# Available at setup time due to pyproject.toml
|
||||
from pybind11.setup_helpers import Pybind11Extension, build_ext
|
||||
|
||||
__version__ = "1.0.0"
|
||||
|
||||
class CMakeExtension(Extension):
|
||||
def __init__(self, name, sourcedir=''):
|
||||
Extension.__init__(self, name, sources=[])
|
||||
self.sourcedir = os.path.abspath(sourcedir)
|
||||
# Note:
|
||||
# Sort input source files if you glob sources to ensure bit-for-bit
|
||||
# reproducible builds (https://github.com/pybind/python_example/pull/53)
|
||||
|
||||
|
||||
class CMakeBuild(build_ext):
|
||||
def run(self):
|
||||
try:
|
||||
out = subprocess.check_output(['cmake', '--version'])
|
||||
except OSError:
|
||||
raise RuntimeError("CMake must be installed to build the following extensions: " +
|
||||
", ".join(e.name for e in self.extensions))
|
||||
|
||||
if platform.system() == "Windows":
|
||||
cmake_version = LooseVersion(re.search(r'version\s*([\d.]+)', out.decode()).group(1))
|
||||
if cmake_version < '3.1.0':
|
||||
raise RuntimeError("CMake >= 3.1.0 is required on Windows")
|
||||
|
||||
for ext in self.extensions:
|
||||
self.build_extension(ext)
|
||||
|
||||
def build_extension(self, ext):
|
||||
extdir = os.path.abspath(os.path.dirname(self.get_ext_fullpath(ext.name)))
|
||||
# required for auto-detection of auxiliary "native" libs
|
||||
if not extdir.endswith(os.path.sep):
|
||||
extdir += os.path.sep
|
||||
|
||||
cmake_args = ['-DCMAKE_LIBRARY_OUTPUT_DIRECTORY=' + extdir,
|
||||
'-DPYTHON_EXECUTABLE=' + sys.executable]
|
||||
|
||||
cfg = 'Debug' if self.debug else 'Release'
|
||||
build_args = ['--config', cfg]
|
||||
|
||||
if platform.system() == "Windows":
|
||||
cmake_args += ['-DCMAKE_LIBRARY_OUTPUT_DIRECTORY_{}={}'.format(cfg.upper(), extdir)]
|
||||
if sys.maxsize > 2**32:
|
||||
cmake_args += ['-A', 'x64']
|
||||
build_args += ['--', '/m']
|
||||
else:
|
||||
cmake_args += ['-DCMAKE_BUILD_TYPE=' + cfg]
|
||||
build_args += ['--', '-j2']
|
||||
|
||||
env = os.environ.copy()
|
||||
env['CXXFLAGS'] = '{} -DVERSION_INFO=\\"{}\\"'.format(env.get('CXXFLAGS', ''),
|
||||
self.distribution.get_version())
|
||||
if not os.path.exists(self.build_temp):
|
||||
os.makedirs(self.build_temp)
|
||||
subprocess.check_call(['cmake', ext.sourcedir] + cmake_args, cwd=self.build_temp, env=env)
|
||||
subprocess.check_call(['cmake', '--build', '.'] + build_args, cwd=self.build_temp)
|
||||
ext_modules = [Pybind11Extension(
|
||||
"oxenmq",
|
||||
["src/bencode.cpp", "src/module.cpp", "src/oxenmq.cpp"],
|
||||
cxx_std=17,
|
||||
libraries=["oxenmq"],
|
||||
),
|
||||
]
|
||||
|
||||
setup(
|
||||
name='pyoxenmq',
|
||||
version='0.1.0',
|
||||
author='Jeff Becker',
|
||||
author_email='jeff@i2p.rocks',
|
||||
description='pybind oxenmq bindings',
|
||||
long_description='',
|
||||
ext_modules=[CMakeExtension('pyoxenmq')],
|
||||
packages=["lokinet.auth"],
|
||||
cmdclass=dict(build_ext=CMakeBuild),
|
||||
name="oxenmq",
|
||||
version=__version__,
|
||||
author="Jason Rhinelander",
|
||||
author_email="jason@oxen.io",
|
||||
url="https://github.com/oxen-io/oxen-mq",
|
||||
description="Python wrapper for oxen-mq message passing library",
|
||||
long_description="",
|
||||
ext_modules=ext_modules,
|
||||
zip_safe=False,
|
||||
)
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
#include "common.hpp"
|
||||
#include "oxenmq/base32z.h"
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
void BEncode_Init(py::module& mod) {
|
||||
mod.def("base32z_encode", [](py::bytes data) {
|
||||
char* ptr = nullptr;
|
||||
py::ssize_t sz = 0;
|
||||
PyBytes_AsStringAndSize(data.ptr(), &ptr, &sz);
|
||||
return oxenmq::to_base32z(ptr, ptr+sz);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -5,11 +5,9 @@
|
|||
|
||||
namespace py = pybind11;
|
||||
|
||||
namespace oxenmq
|
||||
{
|
||||
void
|
||||
OxenMQ_Init(py::module &mod);
|
||||
namespace oxenmq {
|
||||
|
||||
void OxenMQ_Init(py::module &mod);
|
||||
void BEncode_Init(py::module & mod);
|
||||
|
||||
void
|
||||
BEncode_Init(py::module & mod);
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
#include "common.hpp"
|
||||
|
||||
PYBIND11_MODULE(oxenmq, m) {
|
||||
oxenmq::OxenMQ_Init(m);
|
||||
oxenmq::BEncode_Init(m);
|
||||
}
|
|
@ -0,0 +1,972 @@
|
|||
#include "common.hpp"
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
#include <oxenmq/oxenmq.h>
|
||||
#include <oxenmq/address.h>
|
||||
#include <pybind11/attr.h>
|
||||
#include <pybind11/chrono.h>
|
||||
#include <pybind11/functional.h>
|
||||
#include <pybind11/operators.h>
|
||||
#include <pybind11/pybind11.h>
|
||||
#include <pybind11/pytypes.h>
|
||||
#include <pybind11/stl.h>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <variant>
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
// Convert a py::object containing a str, bytes, or iterable over str/bytes to a vector of message
|
||||
// parts. Throws on invalid input, otherwise returns a vector of parts.
|
||||
//
|
||||
// The gil must be held.
|
||||
//
|
||||
// The first version appends into `parts`, the second version returns a vector.
|
||||
void extract_data_parts(std::vector<std::string>& parts, py::handle obj) {
|
||||
if (py::isinstance<py::bytes>(obj))
|
||||
parts.push_back(obj.cast<py::bytes>());
|
||||
else if (py::isinstance<py::str>(obj))
|
||||
parts.push_back(obj.cast<py::str>());
|
||||
else if (py::isinstance<py::iterable>(obj)) {
|
||||
for (auto o : obj)
|
||||
extract_data_parts(parts, o);
|
||||
} else {
|
||||
throw std::runtime_error{"invalid value '" + std::string{py::repr(obj)} + "': expected bytes/str/iterable"};
|
||||
}
|
||||
}
|
||||
std::vector<std::string> extract_data_parts(py::handle obj) {
|
||||
std::vector<std::string> parts;
|
||||
extract_data_parts(parts, obj);
|
||||
return parts;
|
||||
}
|
||||
|
||||
// Quick and dirty logger that logs to stderr. It would be much nicer to take a python function,
|
||||
// but that deadlocks pretty much right away because of the crappiness of the gil.
|
||||
struct stderr_logger {
|
||||
inline static std::mutex log_mutex;
|
||||
|
||||
void operator()(LogLevel lvl, const char* file, int line, std::string msg) {
|
||||
std::lock_guard l{log_mutex};
|
||||
std::cerr << '[' << lvl << "][" << file << ':' << line << "]: " << msg << "\n";
|
||||
}
|
||||
};
|
||||
|
||||
void
|
||||
OxenMQ_Init(py::module& mod)
|
||||
{
|
||||
using namespace pybind11::literals;
|
||||
constexpr py::kw_only kwonly{};
|
||||
|
||||
py::class_<ConnectionID>(mod, "ConnectionID")
|
||||
.def(py::self == py::self)
|
||||
.def(py::self != py::self)
|
||||
.def_property_readonly("service_node", &ConnectionID::sn)
|
||||
.def_property_readonly("pubkey", [](const ConnectionID& c) { return py::bytes(c.pubkey()); })
|
||||
;
|
||||
py::class_<address>(mod, "Address")
|
||||
.def(py::init<std::string_view>(), "addr"_a,
|
||||
R"(Constructs from an encoded address such as 'curve://HOSTNAME:PORT/PUBKEY'.
|
||||
|
||||
See oxenmq::Address C++ documentation for more details)")
|
||||
.def(py::init([](std::string_view addr, py::bytes pubkey) { return address{addr, (std::string) pubkey}; }),
|
||||
"addr"_a, "pubkey"_a,
|
||||
R"(Constructs from a ZMQ connection string and a 32-byte pubkey.
|
||||
|
||||
This can be used for ipc+curve connections by using an address such as ipc:///path/to/socket)")
|
||||
.def(py::init(py::overload_cast<std::string, uint16_t>(&address::tcp)),
|
||||
"host"_a, "port"_a,
|
||||
R"(Construct a TCP address from a host and port.
|
||||
|
||||
The connection will be plaintext. If the host is an IPv6 address it *must* be surrounded with [ and ].)")
|
||||
.def(py::init([](std::string host, uint16_t port, py::bytes pubkey) {
|
||||
return address::tcp_curve(std::move(host), port, pubkey); }),
|
||||
"host"_a, "port"_a, "pubkey"_a,
|
||||
"Constructs a curve-encrypted TCP address")
|
||||
|
||||
.def_property("pubkey",
|
||||
[](const address& a) { return py::bytes(a.pubkey); },
|
||||
[](address& a, std::optional<py::bytes> pubkey) {
|
||||
if (pubkey) a.set_pubkey(pubkey->operator std::string());
|
||||
else a.set_pubkey("");
|
||||
},
|
||||
R"(Sets or clears the address pubkey.
|
||||
|
||||
If specifying a pubkey then this address is set to use curve encryption with the given 32-byte
|
||||
`bytes` pubkey required for the remote endpoint. An existing pubkey is replaced, if present.
|
||||
|
||||
If set to None then this address is set to use an unencrypted plaintext connection.)")
|
||||
.def_property_readonly("curve", py::overload_cast<>(&address::curve, py::const_), "true if this is a curve-enabled address")
|
||||
.def_property_readonly("tcp", py::overload_cast<>(&address::tcp, py::const_), "true if this is a tcp address")
|
||||
.def_property_readonly("ipc", py::overload_cast<>(&address::ipc, py::const_), "true if this is an ipc address")
|
||||
.def_property_readonly("zmq_address", &address::zmq_address,
|
||||
"accesses the zmq address portion of the address (note that this does not contain any curve encryption information)")
|
||||
.def_property_readonly("full_address", [](const address& a) { return a.full_address(address::encoding::base32z); },
|
||||
"returns the full address, including curve information, encoding the curve pubkey as base32z")
|
||||
.def_property_readonly("full_address_b64", [](const address& a) { return a.full_address(address::encoding::base64); },
|
||||
"returns the full address, including curve information, encoding the curve pubkey as base64")
|
||||
.def_property_readonly("full_address_hex", [](const address& a) { return a.full_address(address::encoding::hex); },
|
||||
"returns the full address, including curve information, encoding the curve pubkey as hex")
|
||||
.def_property_readonly("qr", &address::qr_address,
|
||||
R"(Access the full address as a RQ-encoding optimized string.
|
||||
|
||||
Only available for tcp addresses. The resulting string only contains characters from the
|
||||
Alphanumeric QR alphabet (i.e. all uppercase, and uses $...$ for IPv6 addresses instead of [...]),
|
||||
allowing for more efficient QR encoding. This format can be passed to the single-argument Address
|
||||
constructor.)")
|
||||
.def(py::self == py::self)
|
||||
.def(py::self != py::self)
|
||||
;
|
||||
py::class_<TaggedThreadID>(mod, "TaggedThreadID");
|
||||
py::class_<TimerID>(mod, "TimerID");
|
||||
|
||||
py::enum_<AuthLevel>(mod, "AuthLevel")
|
||||
.value("denied", AuthLevel::denied,
|
||||
"Not actually an auth level, but can be returned by the AllowFunc to deny an incoming connection.")
|
||||
.value("none", AuthLevel::none,
|
||||
"No authentication at all; any random incoming ZMQ connection can invoke this command.")
|
||||
.value("basic", AuthLevel::basic,
|
||||
"Basic authentication commands require a login, or a node that is specifically configured to be a public node (e.g. for public RPC).")
|
||||
.value("admin", AuthLevel::admin,
|
||||
R"(Advanced authentication commands require an admin user, either via explicit login or by implicit login from localhost.
|
||||
|
||||
This typically protects administrative commands like shutting down or access to sensitive data.)")
|
||||
;
|
||||
|
||||
py::class_<Access>(mod, "Access", "The access level for a command category.")
|
||||
.def(py::init<AuthLevel, bool, bool>(),
|
||||
"auth"_a = AuthLevel::none,
|
||||
"remote_sn"_a = false,
|
||||
"local_sn"_a = false,
|
||||
R"(Specifies a command category access level.
|
||||
|
||||
- auth - the required access level to access commands in this category.
|
||||
|
||||
- remote_sn - if True then this command may only be invoked by remote connections who we recognize
|
||||
(by pubkey) as being service nodes. (Requires sn_lookup to be provided during construction).
|
||||
|
||||
- local_sn - if True then this command will be unavailable if the OxenMQ object was not constructed
|
||||
with service_node=True.)");
|
||||
py::implicitly_convertible<AuthLevel, Access>();
|
||||
|
||||
py::class_<Message> msg(mod, "Message", "Temporary object containing details of a just-received message");
|
||||
msg
|
||||
.def_property_readonly("is_reply", [](const Message& m) { return !m.reply_tag.empty(); },
|
||||
"True if this message is expecting a reply (i.e. it was received on a request_command endpoint)")
|
||||
.def_readonly("remote", &Message::remote, R"(Some sort of remote address from which the request came.
|
||||
|
||||
Typically the IP address string for TCP connections and "localhost:UID:GID:PID" for unix socket IPC connections.)")
|
||||
.def_readonly("conn", &Message::conn, "The connection ID info for routing a reply")
|
||||
.def_readonly("access", &Message::access,
|
||||
"The access level of the invoker (which can be higher than the access level required for the command category")
|
||||
.def("dataview", [](const Message& m) {
|
||||
py::list l;
|
||||
for (auto& part : m.data)
|
||||
l.append(py::memoryview::from_memory(part.data(), part.size()));
|
||||
return l;
|
||||
},
|
||||
R"(Returns a list of the data message parts as memoryviews.
|
||||
|
||||
Note that the returned views are only valid for the duration of the callback invoked with the
|
||||
Message; if you need them beyond that then you must copy them (e.g. by calling message.data()
|
||||
or .to_bytes() on each one)"
|
||||
)
|
||||
.def("data", [](const Message& m) {
|
||||
py::list l;
|
||||
for (auto& part : m.data)
|
||||
l.append(py::bytes{part.data(), part.size()});
|
||||
return l;
|
||||
},
|
||||
"Returns a *copy* of the data message parts as a list of `bytes`."
|
||||
)
|
||||
.def("reply", [](Message& m, py::args args) {
|
||||
m.send_reply(send_option::data_parts(extract_data_parts(args)));
|
||||
},
|
||||
R"(Sends a reply back to this caller.
|
||||
|
||||
`args` must be bytes, str, or iterables thereof (and will be flatted). Should only be used from a
|
||||
request_command endpoint (i.e. when .is_reply is true)")
|
||||
.def("back", [](Message& m, std::string command, py::args args) {
|
||||
m.send_back(command, send_option::data_parts(extract_data_parts(args)));
|
||||
},
|
||||
"command"_a,
|
||||
R"(Sends a new message (NOT a reply) to the caller.
|
||||
|
||||
This is used to send a simple message back to the caller which is *not* specifically a request reply
|
||||
but rather is simply an endpoint to invoke on the caller who sent this message to us. `command`
|
||||
should be the command endpoint, and `response` must be none (for no data parts), bytes, str, or an
|
||||
iterable thereof.
|
||||
|
||||
This is a shortcut for `oxenmq.send(msg.conn, command, *args)`; use the full version if you need
|
||||
extra send functionality.)")
|
||||
.def("request", [](Message& m, std::string command, OxenMQ::ReplyCallback callback, py::args args) {
|
||||
std::vector<std::string> data;
|
||||
m.send_request(command, std::move(callback), send_option::data_parts(extract_data_parts(args)));
|
||||
},
|
||||
"command"_a, "on_reply"_a,
|
||||
R"(Sends a new request (NOT a reply) back to the remote caller.
|
||||
|
||||
This is used to send a new request back to the caller which is *not* specifically a request reply
|
||||
but rather is simply a new request endpoint to invoke on the caller who sent this message to us.
|
||||
`command` should be the command endpoint, and `response` must be none (for no data parts), bytes,
|
||||
str, or an iterable thereof.
|
||||
|
||||
This is a shortcut for `oxenmq.request(msg.conn, command, on_reply, *args)`; use the full version if you need
|
||||
extra send functionality.)")
|
||||
.def("later", &Message::send_later,
|
||||
R"(Returns a proxy object which can be stored and used to reply to the remote caller at a later point.
|
||||
|
||||
Unlike Message, the value returned here may outlive the command callback (as long as the OxenMQ
|
||||
instance is still alive).)")
|
||||
;
|
||||
|
||||
py::class_<Message::DeferredSend>(msg, "DeferredSend")
|
||||
.def_property_readonly("is_reply", [](const Message::DeferredSend& m) { return !m.reply_tag.empty(); },
|
||||
"True if this message is expecting a reply (i.e. it was received on a request_command endpoint)")
|
||||
.def("reply", [](Message::DeferredSend& d, py::args args) {
|
||||
d.reply(send_option::data_parts(extract_data_parts(args)));
|
||||
},
|
||||
"Same as Message.reply(), but deferrable")
|
||||
.def("back", [](Message::DeferredSend& d, std::string command, py::args args) {
|
||||
d.back(command, send_option::data_parts(extract_data_parts(args)));
|
||||
},
|
||||
"command"_a,
|
||||
"Same as Message.back(), but deferrable")
|
||||
.def("request", [](Message::DeferredSend& d, std::string command, OxenMQ::ReplyCallback callback, py::args args) {
|
||||
d.request(command, std::move(callback), send_option::data_parts(extract_data_parts(args)));
|
||||
},
|
||||
"command"_a, "on_reply"_a,
|
||||
"Same as Message.request(), but deferrable")
|
||||
.def("__call__", [](Message::DeferredSend& d, py::args args, py::kwargs kwargs) {
|
||||
auto method = py::cast(&d).attr(!d.reply_tag.empty() ? "reply" : "back");
|
||||
return method(*args, **kwargs);
|
||||
},
|
||||
"Equivalent to `reply(...)` for a request message, `back(...)` for a non-request message")
|
||||
;
|
||||
|
||||
py::class_<CatHelper>(mod, "Category",
|
||||
"Helper class to add in registering category commands, returned from OxenMQ.add_category(...)")
|
||||
.def("add_command", [](CatHelper& cat, std::string name, py::function cb) {
|
||||
return cat.add_command(name, [cb=std::move(cb)](Message& m) {
|
||||
py::gil_scoped_acquire gil;
|
||||
cb(&m);
|
||||
});
|
||||
},
|
||||
"name"_a, "callback"_a,
|
||||
R"(Add a command handler to this category.
|
||||
|
||||
Adds a command, that is a command that is typically some sort of instruction that requires no reply.
|
||||
(For a more typically request-response interface use .add_request_command instead).
|
||||
|
||||
The callback is passed a `Message` object containing details of the received message. Note that
|
||||
this object must *not* be stored beyond the callback itself; see `Message` for details.)")
|
||||
.def("add_request_command",
|
||||
[](CatHelper& cat,
|
||||
std::string name,
|
||||
std::function<py::object(Message* msg)> handler)
|
||||
{
|
||||
cat.add_request_command(name, [handler](Message& msg) {
|
||||
std::vector<std::string> result;
|
||||
{
|
||||
py::gil_scoped_acquire gil;
|
||||
|
||||
py::object obj = handler(&msg);
|
||||
if (obj.is_none())
|
||||
return;
|
||||
try {
|
||||
result = extract_data_parts(obj);
|
||||
} catch (const std::exception& e) {
|
||||
msg.oxenmq.log(LogLevel::warn, __FILE__, __LINE__,
|
||||
"Python callback returned "s + e.what());
|
||||
return;
|
||||
}
|
||||
}
|
||||
msg.send_reply(send_option::data_parts(result));
|
||||
});
|
||||
return &cat;
|
||||
},
|
||||
"name"_a, "handler"_a,
|
||||
R"(Add a request command to this category.
|
||||
|
||||
Adds a request command, that is, a command that is always expected to reply, to this category. The
|
||||
callback is passed a Message object containing details of the required message and must return one
|
||||
of:
|
||||
|
||||
- None - no reply will be sent; typically returned because you sent it yourself (via
|
||||
Message.reply()), or because you want to send it later via Message.later().
|
||||
- bytes - will be sent as is in a single-part reply.
|
||||
- str - will be sent in utf-8 encoding in a single-part reply.
|
||||
- iterable object containing bytes and/or str elements: will be sent as a multi-part reply where
|
||||
each part is sent as-is (bytes) or utf8-encoded (str).
|
||||
|
||||
The callback also must take care not to save the provided `Message` value beyond the end of the
|
||||
callback itself.)")
|
||||
;
|
||||
|
||||
py::enum_<LogLevel>(mod, "LogLevel")
|
||||
.value("fatal", LogLevel::fatal).value("error", LogLevel::error).value("warn", LogLevel::warn)
|
||||
.value("info", LogLevel::info).value("debug", LogLevel::debug).value("trace", LogLevel::trace);
|
||||
|
||||
py::class_<OxenMQ> oxenmq{mod, "OxenMQ"};
|
||||
oxenmq
|
||||
.def(py::init([](
|
||||
py::bytes pubkey,
|
||||
py::bytes privkey,
|
||||
bool sn,
|
||||
OxenMQ::SNRemoteAddress sn_lookup,
|
||||
std::optional<LogLevel> log_level) {
|
||||
return std::make_unique<OxenMQ>(pubkey, privkey, sn, std::move(sn_lookup),
|
||||
log_level ? OxenMQ::Logger{stderr_logger{}} : nullptr,
|
||||
log_level.value_or(LogLevel::warn));
|
||||
}),
|
||||
kwonly,
|
||||
"pubkey"_a = py::bytes(), "privkey"_a = py::bytes(), "service_node"_a = false,
|
||||
"sn_lookup"_a = py::none(), "log_level"_a = py::none(),
|
||||
R"(OxenMQ constructor.
|
||||
|
||||
This constructs the object but does not start it; you will typically want to first add categories
|
||||
and commands, then finish startup by invoking `start()`. (Categories and commands cannot be added
|
||||
after startup).
|
||||
|
||||
Parameters:
|
||||
- pubkey - the x25519 public key (32-byte binary string). For a service node this is the service
|
||||
node x25519 keypair. For non-service nodes this (and privkey) can be empty strings to
|
||||
automatically generate an ephemeral keypair.
|
||||
|
||||
- privkey the service node's private key (32-byte binary string), or empty to generate one.
|
||||
|
||||
- service_node - True if this instance should be considered a service node for the purpose of
|
||||
allowing "Access::local_sn" remote calls. (This should be true if we are *capable* of being a
|
||||
service node, whether or not we are currently actively). If specified as true then the pubkey and
|
||||
privkey values must not be empty.
|
||||
|
||||
- sn_lookup - function that takes a pubkey key (32-byte bytes) and returns a connection string such
|
||||
as "tcp://1.2.3.4:23456" to which a connection should be established to reach that service node.
|
||||
Note that this function is only called if there is no existing connection to that service node,
|
||||
and that the function is never called for a connection to self (that uses an internal connection
|
||||
instead). Also note that the service node must be listening in curve25519 mode (otherwise we
|
||||
couldn't verify its authenticity). Should return empty for not found or if SN lookups are not
|
||||
supported. If omitted a stub function is used that always returns empty.
|
||||
|
||||
- log_level the initial log level; defaults to warn. The log level can be changed later by calling
|
||||
log_level(...). Note that currently all logging goes directly to stderr because of obstacles with
|
||||
Python's GIL. In the future a Python callback may be supported for log messages.
|
||||
)")
|
||||
|
||||
.def_readwrite("handshake_time", &OxenMQ::HANDSHAKE_TIME,
|
||||
"How long to wait for handshaking to complete on new connections before timing out.")
|
||||
.def_readwrite("ephemeral_routing_id", &OxenMQ::EPHEMERAL_ROUTING_ID,
|
||||
R"(Whether to use random connection IDs for outgoing connections.
|
||||
|
||||
If set to True then use random connection IDs for each outgoing connection rather than the default
|
||||
IDs based on the local pubkey (False). Using the same connection ID allows re-establishing
|
||||
connections (even after an application restart) with a remote without losing incoming messages, but
|
||||
does not allow multiple connections to a remote using the same keypair.)")
|
||||
.def_readwrite("max_message_size", &OxenMQ::MAX_MSG_SIZE,
|
||||
R"(Maximum incoming message size.
|
||||
|
||||
If a remote attempts to send something larger the connection is closed. -1 means no limit.)")
|
||||
.def_readwrite("max_sockets", &OxenMQ::MAX_SOCKETS,
|
||||
R"(Maximum open sockets supported by the internal zmq layer.
|
||||
|
||||
Defaults to a large number. If changing then this must be set before start() is called.)")
|
||||
.def_readwrite("reconnect_interval", &OxenMQ::RECONNECT_INTERVAL,
|
||||
"Minimum time to wait before attempting to reconnect a failed connection.")
|
||||
.def_readwrite("reconnect_interval_max", &OxenMQ::RECONNECT_INTERVAL_MAX,
|
||||
R"(Maximum reconnect interval.
|
||||
|
||||
When larger than omq.reconnect_interval then upon subsequent reconnection failures an
|
||||
exponential backoff will be used, up to at most this interval between reconnection attempts.)")
|
||||
.def_readwrite("close_linger", &OxenMQ::CLOSE_LINGER,
|
||||
"How long (at most) to wait for connections to close cleanly when closing.")
|
||||
.def_readwrite("connection_check_interval", &OxenMQ::CONN_CHECK_INTERVAL,
|
||||
R"(How frequently we cleanup connections.
|
||||
|
||||
Cleaning up involves closing idle connections and calling connect or request failure callbacks.
|
||||
Making this slower results in more "overshoot" before failure callbacks are invoked; making it too
|
||||
fast results in more proxy thread overhead. Any change to this variable must be set before calling
|
||||
start().)")
|
||||
.def_readwrite("connection_heartbeat", &OxenMQ::CONN_HEARTBEAT,
|
||||
R"(Whether to enable heartbeats on incoming/outgoing connections.
|
||||
|
||||
If set to > 0 then we set up ZMQ to send a heartbeat ping over the socket this often, which helps
|
||||
keep the connection alive and lets failed connections be detected sooner (see also
|
||||
connection_heartbeat_timeout). Changing the value only affects new connections.
|
||||
|
||||
Defaults to 15s)")
|
||||
.def_readwrite("connection_heartbeat_timeout", &OxenMQ::CONN_HEARTBEAT_TIMEOUT,
|
||||
R"(How long after missing heartbeats to consider a socket dead.
|
||||
|
||||
When .conn_heartbeat is enabled, this sets how long we wait for a reply on a socket before
|
||||
considering the socket to have died and closing it. Changing the value only affects new
|
||||
connections.)")
|
||||
.def_readwrite("startup_umask", &OxenMQ::STARTUP_UMASK,
|
||||
R"(The umask to apply when constructing sockets
|
||||
|
||||
This primarily affects any new ipc:// listening sockets that get created. Does nothing if set to -1
|
||||
(the default), and does nothing on Windows. Note that the umask is applied temporarily during
|
||||
`start()`, so may affect other threads that create files/directories at the same time as the start()
|
||||
call.)")
|
||||
.def_property_readonly("pubkey",
|
||||
[](const OxenMQ& self) {
|
||||
auto& pub = self.get_pubkey();
|
||||
return py::bytes(pub.data(), pub.size());
|
||||
},
|
||||
"Accesses this OxenMQ's x25519 public key, as bytes.")
|
||||
.def_property_readonly("privkey",
|
||||
[](const OxenMQ& self) {
|
||||
auto& priv = self.get_privkey();
|
||||
return py::bytes(priv.data(), priv.size());
|
||||
},
|
||||
"Accesses this OxenMQ's x25519 private key, as bytes.")
|
||||
.def("start", &OxenMQ::start, R"(Starts the OxenMQ object.
|
||||
|
||||
This is called after all initialization (categories, etc.) is configured. This binds to the bind
|
||||
locations given in the constructor and launches the proxy thread to handle message dispatching
|
||||
between remote nodes and worker threads.
|
||||
|
||||
Things you want to do before calling this:
|
||||
- Use `add_category`/`add_command` to set up any commands remote connections can invoke.
|
||||
- If any commands require SN authentication, specify a list of currently active service node
|
||||
pubkeys via `set_active_sns()` (and make sure this gets updated when things change by
|
||||
another `set_active_sns()` or a `update_active_sns()` call). It *is* possible to make the
|
||||
initial call after calling `start()`, but that creates a window during which incoming
|
||||
remote SN connections will be erroneously treated as non-SN connections.
|
||||
- If this LMQ instance should accept incoming connections, set up any listening ports via
|
||||
`listen_curve()` and/or `listen_plain()`.)")
|
||||
.def("listen", [](OxenMQ& self,
|
||||
std::string bind,
|
||||
bool curve,
|
||||
OxenMQ::AllowFunc allow,
|
||||
std::function<void(bool success)> on_bind) {
|
||||
if (curve)
|
||||
self.listen_curve(bind, std::move(allow), std::move(on_bind));
|
||||
else
|
||||
self.listen_plain(bind, std::move(allow), std::move(on_bind));
|
||||
},
|
||||
"bind"_a, "curve"_a, kwonly, "allow_connection"_a = nullptr, "on_bind"_a = nullptr,
|
||||
R"(Start listening on the given bind address.
|
||||
|
||||
Incoming connections can come from anywhere. `allow_connection` is invoked for any incoming
|
||||
connections on this address to determine the incoming remote's access and authentication level.
|
||||
|
||||
This method may be called after start if dynamic listening is required, but it is generally
|
||||
recommended that long-term fixed listening endpoints be set up by calling this *before* start().
|
||||
|
||||
Parameters:
|
||||
|
||||
- bind - can be any bind address string zmq supports, for example a tcp IP/port combination such as:
|
||||
"tcp://*:4567" or "tcp://1.2.3.4:5678".
|
||||
|
||||
- curve - whether the connection is curve-encrypted (True) or plaintext (False). For plaintext
|
||||
connections the allow_connection callback will be invoked with an empty remote pubkey and
|
||||
service_node set to False.
|
||||
|
||||
- allow_connection function to call to determine whether to allow the connection and, if so, the
|
||||
authentication level it receives. The function is called with the remote's address, the remote's
|
||||
32-byte pubkey (only for curve; empty for plaintext), and whether the remote is recognized as a
|
||||
service node (always False for plaintext; requires sn_lookup being configured in construction).
|
||||
If omitted (or null) the default returns AuthLevel::none access for all incoming connections.
|
||||
|
||||
- on_bind a callback to invoke when the port has been successfully opened or failed to open, called
|
||||
with a single boolean argument of True for success, False for failure. For addresses set up
|
||||
before .start() this will be called during `start()` itself; for post-start listens this will be
|
||||
called from the proxy thread when it opens the new port. Note that this function is called
|
||||
directly from the proxy thread and so should be fast and non-blocking.
|
||||
)")
|
||||
.def("add_tagged_thread", [](OxenMQ& self, std::string name, std::function<void()> start) {
|
||||
return self.add_tagged_thread(std::move(name), std::move(start));
|
||||
},
|
||||
"name"_a, kwonly, "start"_a = std::nullopt,
|
||||
R"(Creates a "tagged thread".
|
||||
|
||||
This creates a thread with a specific "tag" and starts it immediately. A tagged thread is one that
|
||||
batches, jobs, and timer jobs can be sent to specifically, typically to perform coordination of some
|
||||
thread-unsafe work, that will be reserved for use only for jobs that specifically request its use.
|
||||
|
||||
Tagged threads will *only* process jobs sent specifically to them; they do not participate in the
|
||||
thread pool used for regular jobs. Each tagged thread also has its own job queue completely
|
||||
separate from any other jobs.
|
||||
|
||||
Tagged threads must be created *before* `start()` is called. The name will be used to set the
|
||||
thread name in the process table (if supported on the OS).
|
||||
|
||||
Parameters:
|
||||
|
||||
- name - the name of the thread; will be used in log messages and (if supported by the OS) as the
|
||||
system thread name.
|
||||
|
||||
- start - an optional callback to invoke from the thread as soon as OxenMQ itself starts up (i.e.
|
||||
after a call to `start()`).
|
||||
|
||||
Returns a TaggedThreadID object that can be passed to job(), batch(), or add_timer() to direct the
|
||||
task to the tagged thread.
|
||||
)")
|
||||
.def("set_general_threads", &OxenMQ::set_general_threads, "threads"_a,
|
||||
R"(Sets the number of general threads to use for handling requests.
|
||||
|
||||
This controls the maximum number of threads that may be created to deal with general tasks such as
|
||||
handling incoming commands. Cannot be called after `start()`.
|
||||
|
||||
If not called then the default is to use the number of CPUs/threads detected on the system.
|
||||
|
||||
Changing this also affects the default value of set_batch_threads() and set_reply_threads().)")
|
||||
.def("set_reply_threads", &OxenMQ::set_reply_threads, "threads"_a,
|
||||
|
||||
R"(Set the minimum number of threads used for processing replies.
|
||||
|
||||
This sets the minimum number of threads dedicated to processing incoming request commands (i.e. that
|
||||
need a reply) and related operations such as the on_success/on_failure handlers for establishing
|
||||
connections. Cannot be called after `start()`.
|
||||
|
||||
If unset, this defaults to one-eighth of the number of general threads, rounded up.
|
||||
|
||||
Note that any such tasks would *first* use general threads; this limit would allow spawning new
|
||||
threads only if all general threads are currently busy *and* there are not at least this many reply
|
||||
tasks currently being processed.)")
|
||||
|
||||
.def("set_batched_threads", &OxenMQ::set_batch_threads,
|
||||
"threads"_a, R"(Set the maximum number of threads to spawn for batch jobs.
|
||||
|
||||
This sets the limit on the maximum number of threads that may be created to process batch jobs,
|
||||
including explicit batch jobs and timers scheduled via add_timer(), but not including jobs that are
|
||||
sent to a specific TaggedThreadID. Cannot be called after `start()`.
|
||||
|
||||
Batch jobs will first attempt to use an available general thread; an extra thread will be spawned to
|
||||
handle a batch job only if all general threads are currently busy *and* fewer than this many threads
|
||||
are currently processing batch jobs.)")
|
||||
|
||||
.def("add_timer", py::overload_cast<
|
||||
std::function<void()>,
|
||||
std::chrono::milliseconds,
|
||||
bool,
|
||||
std::optional<TaggedThreadID>>(&OxenMQ::add_timer),
|
||||
"job"_a, "interval"_a, kwonly, "squelch"_a = true, "thread"_a = std::nullopt,
|
||||
R"(Adds a callback to be invoked on a repeating timer.
|
||||
|
||||
The callback will be invoke approximately every `interval`.
|
||||
|
||||
Optional parameters:
|
||||
|
||||
- squelch - When True (the default) this job will not be double-booked: that is, the callback will
|
||||
be skipped if a previous callback from this timer is already scheduled (or is still running). If
|
||||
set to False then the callback will be scheduled even if an existing callback has not yet
|
||||
completed.
|
||||
|
||||
- thread - a TaggedThreadID specifying a tagged thread (created with `add_tagged_thread`) in which
|
||||
the timer should run. If unspecified then the timer runs in the general batch job queue.)")
|
||||
.def("cancel_timer", &OxenMQ::cancel_timer, R"(Cancels a timer previously added with add_timer().
|
||||
|
||||
Note that this does not queue any already-scheduled timer job and as a result it is possible for an
|
||||
already-scheduled timer job to run *after* this call removes the timer.
|
||||
|
||||
It is permitted to call this with the same TimerID multiple times; subsequent calls have no effect.)")
|
||||
.def("job", &OxenMQ::job, "job"_a, py::kw_only(), "thread"_a = std::nullopt,
|
||||
R"(Queues a job to be run as an OxenMQ job.
|
||||
|
||||
This submits a callback to be invoked by OxenMQ. The job can either be scheduled with general batch
|
||||
jobs or can be directed to a specific tagged thread (created with `add_tagged_thread`).)")
|
||||
.def("add_category", &OxenMQ::add_category,
|
||||
"name"_a, "access_level"_a, kwonly, "reserved_threads"_a = 0, "max_queue"_a = 200,
|
||||
py::keep_alive<0, 1>(),
|
||||
R"(Add a new command category.
|
||||
|
||||
Returns an object that adds commands or request commands to the category.
|
||||
|
||||
Parameters:
|
||||
|
||||
name - the category name; must be non-empy and may not contain a ".".
|
||||
|
||||
access_level - the access requirements for remote invocation of the commands inside this category.
|
||||
Can either be a full Access instance or, when you don't care about SN status, simply an AuthLevel
|
||||
value.
|
||||
|
||||
reserved_threads - if > 0 then always allow this many endpoints of this category to be processed at
|
||||
once. In particular if no worker thread is available and fewer than this are currently processing
|
||||
commands in this category then we will spawn a new thread to handle the task. This is used to
|
||||
ensure that some categories always have processing capacity even if other long-running tasks are
|
||||
using all general threads.
|
||||
|
||||
max_queue - the maximum number of incoming commands that will be queued for commands in this
|
||||
category waiting for an available thread to process them before we start dropping new incoming
|
||||
commands. -1 means unlimited, 0 means we never queue (i.e. we drop if no thread is immediately
|
||||
available).
|
||||
)")
|
||||
.def("add_command_alias", &OxenMQ::add_command_alias,
|
||||
"from"_a, "to"_a,
|
||||
R"(Adds a command alias.
|
||||
|
||||
This allows adding backwards-compatible aliases for commands that have moved. For example mapping
|
||||
"cat.meow" to "dog.bark" would make any incoming requests to the cat.meow endpoint be treated as if
|
||||
they had requested the dog.bark endpoint. Note that this mapping happens *before* applying category
|
||||
permissions: in this example, the required permissions the access the endpoint would be those of the
|
||||
"dog" category rather than the "cat" category.)")
|
||||
.def("connect_remote", [](OxenMQ& self,
|
||||
const address& remote,
|
||||
OxenMQ::ConnectSuccess on_success,
|
||||
OxenMQ::ConnectFailure on_failure,
|
||||
std::chrono::milliseconds timeout,
|
||||
std::optional<bool> ephemeral_routing_id) {
|
||||
|
||||
return self.connect_remote(remote, std::move(on_success), std::move(on_failure),
|
||||
connect_option::timeout{timeout},
|
||||
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)}
|
||||
);
|
||||
},
|
||||
"remote"_a, "on_success"_a, "on_failure"_a,
|
||||
kwonly,
|
||||
"timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, "ephemeral_routing_id"_a = std::nullopt,
|
||||
R"(
|
||||
Starts connecting to a remote address and return immediately. The connection can be used
|
||||
immediately, however messages will only be queued until the connection is established (or dropped if
|
||||
the connection fails). The given callbacks are invoked for success or failure.
|
||||
|
||||
`ephemeral_routing_id` and `timeout` allowing overriding the defaults (oxenmq.EPHEMERAL_ROUTING_ID
|
||||
and 10s, respectively).
|
||||
)")
|
||||
.def("connect_remote", [](OxenMQ& self,
|
||||
const address& remote,
|
||||
std::chrono::milliseconds timeout,
|
||||
std::optional<bool> ephemeral_routing_id) {
|
||||
std::promise<ConnectionID> promise;
|
||||
self.connect_remote(
|
||||
remote,
|
||||
[&promise](ConnectionID id) { promise.set_value(std::move(id)); },
|
||||
[&promise](auto, std::string_view reason) {
|
||||
promise.set_exception(std::make_exception_ptr(
|
||||
std::runtime_error{"Connection failed: " + std::string{reason}}));
|
||||
},
|
||||
oxenmq::connect_option::timeout{timeout},
|
||||
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)}
|
||||
);
|
||||
return promise.get_future().get();
|
||||
}, "remote"_a, "timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, "ephemeral_routing_id"_a = std::nullopt,
|
||||
R"(Simpler version of connect_remote that connects to a remote address synchronously.
|
||||
|
||||
This will block until the connection is established or times out; throws on connection failure,
|
||||
returns the ConnectionID on success.
|
||||
|
||||
Takes the address and an optional `timeout` to override the timeout (default 10s))")
|
||||
.def("connect_sn", [](OxenMQ& self,
|
||||
py::bytes pubkey,
|
||||
std::optional<std::chrono::milliseconds> keep_alive,
|
||||
std::optional<std::string> remote_hint,
|
||||
std::optional<bool> ephemeral_routing_id) {
|
||||
return self.connect_sn(std::string{pubkey},
|
||||
connect_option::keep_alive{keep_alive.value_or(-1ms)},
|
||||
connect_option::hint{remote_hint.value_or("")},
|
||||
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)});
|
||||
}, "pubkey"_a, kwonly, "keep_alive"_a, "remote_hint"_a, "ephemeral_routing_id"_a,
|
||||
R"(Connect to a remote service node by pubkey.
|
||||
|
||||
Try to initiate a connection to the given SN in anticipation of needing a connection in the future.
|
||||
If a connection is already established, the connection's idle timer will be reset (so that the
|
||||
connection will not be closed too soon). If the given idle timeout is greater than the current idle
|
||||
timeout then the timeout increases to the new value; if less than the current timeout it is ignored.
|
||||
(Note that idle timeouts only apply if the existing connection is an outgoing connection).
|
||||
|
||||
Note that this method (along with send) doesn't block waiting for a connection; it merely instructs
|
||||
the proxy thread that it should establish a connection.
|
||||
|
||||
Parameters:
|
||||
- pubkey - the public key (length 32 bytes value) of the service node to connect to; the remote's
|
||||
address will be determined by calling the sn_lookup function giving during construction.
|
||||
|
||||
- keep_alive - how long the SN connection will be kept alive after valid activity. Defaults to 5
|
||||
minutes, which is notably longer than the default 30s for automatic connections established when
|
||||
using `send()` with a pubkey.
|
||||
|
||||
- remote_hint - a connection string that *may* be used instead of doing a lookup via the sn_lookup
|
||||
callback to find the remote address. Typically provided only if the location has already been
|
||||
looked up for some other reason.
|
||||
|
||||
- ephemeral_routing_id - if set, override the default OxenMQ.EPHEMERAL_ROUTING_ID for this
|
||||
connection.
|
||||
|
||||
Returns a ConnectionID that identifies an connection with the given SN. Typically you *don't* need
|
||||
to worry about saving this (and can just discard it): you can always simply pass the pubkey into
|
||||
send/request methods to send to the SN by pubkey.
|
||||
)")
|
||||
.def("connect_inproc", &OxenMQ::connect_inproc<>,
|
||||
"on_success"_a, "on_failure"_a,
|
||||
R"(Establish a connection to ourself.
|
||||
|
||||
Connects to the built-in in-process listening socket of this OxenMQ server for local communication.
|
||||
Note that auth_level defaults to admin (unlike connect_remote), and the default timeout is much
|
||||
shorter.
|
||||
|
||||
This connection is designed to allow code within the same process to invoke registered commands via
|
||||
the OxenMQ object. The connection works whether or not there are any accessible external listeners.
|
||||
|
||||
Also note that incoming inproc requests are unauthenticated: that is, they will always have
|
||||
admin-level access.
|
||||
)")
|
||||
.def("disconnect", &OxenMQ::disconnect,
|
||||
"conn"_a, "linger"_a = 1s,
|
||||
R"(Disconnect an established connection.
|
||||
|
||||
Disconnects an established outgoing connection established with `connect_remote()` (or, less
|
||||
commonly, `connect_sn()`).
|
||||
|
||||
`linger` allows you to control how long (at most) the connection will be allowed to linger if there
|
||||
are still pending messages to be delivered; if those messages are not delivered within the given
|
||||
time then the connection is closed anyway. (Note that this is non-blocking: the lingering occurs in
|
||||
the background).)")
|
||||
|
||||
.def("send", [](OxenMQ& self, std::variant<ConnectionID, py::bytes> conn,
|
||||
std::string command,
|
||||
py::args args, py::kwargs kwargs) {
|
||||
|
||||
if (auto* bytes = std::get_if<py::bytes>(&conn)) {
|
||||
if (len(*bytes) != 32)
|
||||
throw std::logic_error{"Error: send(...) to=pubkey requires 32-byte pubkey"};
|
||||
conn.emplace<ConnectionID>(*bytes);
|
||||
}
|
||||
|
||||
bool request = kwargs.contains("request") && kwargs["request"].cast<bool>();
|
||||
std::optional<py::function> on_reply, on_reply_failure;
|
||||
if (request) {
|
||||
if (kwargs.contains("on_reply"))
|
||||
on_reply = kwargs["on_reply"].cast<py::function>();
|
||||
if (kwargs.contains("on_reply_failure"))
|
||||
on_reply_failure = kwargs["on_reply_failure"].cast<py::function>();
|
||||
} else if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure")) {
|
||||
throw std::logic_error{"Error: send(...) on_reply=/on_reply_failure= option "
|
||||
"requires request=True (perhaps you meant to use `.request(...)` instead?)"};
|
||||
}
|
||||
send_option::hint hint{kwargs.contains("remote_hint") ? kwargs["remote_hint"].cast<std::string>() : ""s};
|
||||
send_option::optional optional{kwargs.contains("optional") && kwargs["optional"].cast<bool>()};
|
||||
send_option::incoming incoming{kwargs.contains("incoming_only") && kwargs["incoming_only"].cast<bool>()};
|
||||
send_option::outgoing outgoing{kwargs.contains("outgoing") && kwargs["outgoing"].cast<bool>()};
|
||||
send_option::keep_alive keep_alive{
|
||||
kwargs.contains("keep_alive") ? kwargs["keep_alive"].cast<std::chrono::milliseconds>() : -1ms};
|
||||
send_option::request_timeout request_timeout{
|
||||
kwargs.contains("request_timeout") ? kwargs["request_timeout"].cast<std::chrono::milliseconds>() : -1ms};
|
||||
send_option::queue_failure qfail;
|
||||
if (kwargs.contains("queue_failure"))
|
||||
qfail.callback = [f = kwargs["queue_failure"].cast<std::function<void(std::string error)>>()]
|
||||
(const zmq::error_t* exc) {
|
||||
if (exc)
|
||||
f(exc->what());
|
||||
};
|
||||
send_option::queue_full qfull;
|
||||
if (kwargs.contains("queue_full"))
|
||||
qfull.callback = kwargs["queue_full"].cast<std::function<void()>>();
|
||||
|
||||
std::vector<std::string> data;
|
||||
for (auto arg: args)
|
||||
extract_data_parts(data, arg);
|
||||
|
||||
if (!request) {
|
||||
self.send(std::get<ConnectionID>(conn), command, send_option::data_parts(data),
|
||||
hint, optional, incoming, outgoing, keep_alive, request_timeout,
|
||||
std::move(qfail), std::move(qfull));
|
||||
} else {
|
||||
auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)]
|
||||
(bool success, std::vector<std::string> data) mutable {
|
||||
// The gil here makes things tricky: the function invocation itself is
|
||||
// already gil protected, but the *destruction* of the lambda isn't, and
|
||||
// that breaks things because the destruction frees a python reference to
|
||||
// the callback. However oxenmq invokes this callback exactly once so we
|
||||
// can deal with it by stealing the captures out of the lambda to force
|
||||
// destruction here, with the gil held.
|
||||
py::gil_scoped_acquire gil;
|
||||
auto reply = std::move(on_reply);
|
||||
auto fail = std::move(on_fail);
|
||||
|
||||
if (success ? !reply : !fail)
|
||||
return;
|
||||
py::list l;
|
||||
if (success) {
|
||||
for (const auto& part : data)
|
||||
l.append(py::memoryview::from_memory(part.data(), part.size()));
|
||||
(*reply)(l);
|
||||
} else if (on_fail) {
|
||||
for (const auto& part : data)
|
||||
l.append(py::bytes(part.data(), part.size()));
|
||||
(*fail)(l);
|
||||
}
|
||||
};
|
||||
|
||||
self.request(std::get<ConnectionID>(conn), command, std::move(reply_cb),
|
||||
send_option::data_parts(data),
|
||||
hint, optional, incoming, outgoing, keep_alive, request_timeout,
|
||||
std::move(qfail), std::move(qfull));
|
||||
}
|
||||
},
|
||||
"conn"_a, "command"_a,
|
||||
R"(Sends a message or request to a remote.
|
||||
|
||||
The message is passed to the internal proxy thread to be queued for delivery (i.e. this function
|
||||
does not block).
|
||||
|
||||
Parameters:
|
||||
|
||||
- conn - the ConnectionID or the 32-byte `bytes` pubkey to send the message to; the latter generally
|
||||
requires construction with a sn_lookup callback to resolve pubkeys to connection strings.
|
||||
|
||||
- command - the endpoint name, e.g. "category.command". This can be a command or a request endpoint
|
||||
on the remote, but note that to properly speak to request endpoints you will also need to specify
|
||||
`request=True` (or use the `.request()` wrapper method).
|
||||
|
||||
- args - any additional non-keyword arguments must be str, bytes, or iterables of strings or bytes.
|
||||
These will be flattened and sent as the data part of the message. str values are decoded to utf8;
|
||||
bytes values are sent as-is.
|
||||
|
||||
The following keyword arguments may be provided:
|
||||
|
||||
- request - if true, send this as a request rather than a command. This *must* be True for request
|
||||
endpoints and must be False (or omitted) for non-request endpoints. See also `request()` which
|
||||
wraps `send()` to specify this for you.
|
||||
|
||||
- on_reply - function to call when a response to the request is received, when making a request with
|
||||
request=True. The function will be invoked with a single argument of a list of memoryviews into
|
||||
the data returned by the remote side. Note that these views are only valid for the duration of
|
||||
the callback: if the data needs to be preserved beyond the callback then the callback must copy
|
||||
it. If this value is omitted or None then any successful response is simply discarded.
|
||||
|
||||
- on_reply_failure - function to call if we do not get a successful reply, either for a timeout or
|
||||
because the remote sent us a failure reply. Called with a list of bytes containing failure
|
||||
information. The most common responses are:
|
||||
|
||||
[b'TIMEOUT'] - no reply was received within the request timeout period
|
||||
[b'UNKNOWNCOMMAND'] - the remote did not understand the given request command
|
||||
[b'FORBIDDEN'] - this client does not have the requested access to invoke that command
|
||||
[b'FORBIDDEN_SN'] - the command is only available to service nodes and our pubkey was not
|
||||
recognized as an active service node pubkey.
|
||||
[b'NOT_A_SERVICE_NODE'] - the command is only invokable on service nodes, and the remote is not
|
||||
currently running as a service node.
|
||||
|
||||
Note, however, that empty responses are possible, and that the array can contain multiple elements
|
||||
if additional context is provided by the other end.
|
||||
|
||||
- request_timeout - how long to wait for a reply to the request before giving up and calling the
|
||||
reply callback with a failure status. The default, if unspecified, is 15 seconds. Should only be
|
||||
specified when request=True.
|
||||
|
||||
- queue_failure - a callback to invoke with an error message if we are unable to queue the message
|
||||
for some reason (e.g. because the recipient is no longer reachable available). This does *not*
|
||||
include an inability because we have too much queued already: see the next option for that. Note
|
||||
that queueing occurs in another thread and so this method will be invoked sometime *after* the
|
||||
send() call returns.
|
||||
|
||||
- queue_full - a callback to invoke if we are unable to queue the message because we have too many
|
||||
messages already queued for delivery to that remote. Typically this indicates some network
|
||||
connectivity problem preventing delivery, or the remote may be non-responsive.
|
||||
|
||||
For messages being sent to service nodes by pubkey (which requires having provided a `sn_lookup`
|
||||
function during construction) the following options are also available:
|
||||
|
||||
- optional - if True then only send this message if we are already connected to the given service
|
||||
node, but do not establish a new connection if we are not. Only has an effect when `conn` refers
|
||||
to a service node pubkey.
|
||||
|
||||
- incoming_only - if True then only send this message along an incoming connection from the
|
||||
service node pubkey, and do not send it if we have no such connection.
|
||||
|
||||
- outgoing - if True then only send this message to the remote using an outgoing connection; an
|
||||
existing connection will be used if already established, otherwise a new outgoing connection will
|
||||
be made. Unlike a send() call without this option, existing *incoming* connections from the
|
||||
referenced service nodes will not be used to send the message.
|
||||
|
||||
- keep_alive - if specified and set to a datetime.timedelta then the given value will be used for
|
||||
the keep-alive of an outgoing connection; for existing connections the keep-alive will be
|
||||
increased if currently shorter, and for new connections this sets the keep-alive. Has no effect
|
||||
if the messages uses an existing incoming connection.
|
||||
)")
|
||||
.def("request", [](py::handle self, py::args args, py::kwargs kwargs) {
|
||||
self.attr("send")(*args, **kwargs, "request"_a = true);
|
||||
},
|
||||
"Convenience shortcut for oxenmq.send(..., request=True)")
|
||||
.def("inject_task", &OxenMQ::inject_task,
|
||||
"category"_a, "command"_a, "remote"_a, "callback"_a,
|
||||
R"(Inject a callback as if it were a remotely invoked command.
|
||||
|
||||
This method takes a callback and queues it to be invoked as if it had been called remotely. This allows
|
||||
external command processing to be combined with oxenmq task scheduling.
|
||||
|
||||
For example, oxen-core uses this to handle RPC requests coming in over HTTP as if they were incoming
|
||||
OxenMQ RPC requests, with the same scheduling and queuing of requests applied to both HTTP and
|
||||
OxenMQ requests so that both are treated fairly in terms of processing priority.)")
|
||||
;
|
||||
|
||||
py::enum_<std::future_status>(mod, "future_status")
|
||||
.value("deferred", std::future_status::deferred)
|
||||
.value("ready", std::future_status::ready)
|
||||
.value("timeout", std::future_status::timeout);
|
||||
py::class_<std::future<py::list>>(mod, "ResultFuture",
|
||||
"Wrapper around a C++ future allowing inspecting and waiting for the result to become available.")
|
||||
.def("get", [](std::future<py::list>& f) {
|
||||
{
|
||||
py::gil_scoped_release no_gil;
|
||||
f.wait();
|
||||
}
|
||||
return f.get();
|
||||
}, "Gets the result (or raises an exception if the result raised an exception); must only be called once")
|
||||
.def("valid", [](std::future<py::list>& f) { return f.valid(); },
|
||||
"Returns true if the result is available")
|
||||
.def("wait", &std::future<py::list>::wait, py::call_guard<py::gil_scoped_release>(),
|
||||
"Waits indefinitely for the result to become available")
|
||||
.def("wait_for", &std::future<py::list>::template wait_for<double, std::ratio<1>>,
|
||||
py::call_guard<py::gil_scoped_release>(),
|
||||
"Waits up to the given timedelta for the result to become available")
|
||||
.def("wait_for", [](std::future<py::list>& f, double seconds) {
|
||||
return f.wait_for(std::chrono::duration<double>{seconds}); },
|
||||
py::call_guard<py::gil_scoped_release>(),
|
||||
"Waits up to the given number of seconds for the result to become available")
|
||||
.def("wait_until",
|
||||
&std::future<py::list>::template wait_until<std::chrono::system_clock, std::chrono::system_clock::duration>,
|
||||
py::call_guard<py::gil_scoped_release>(),
|
||||
"Wait until the given datetime for the result to become available")
|
||||
;
|
||||
|
||||
oxenmq.def("request_future", [](py::handle self, py::args args, py::kwargs kwargs) {
|
||||
if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure"))
|
||||
throw std::logic_error{"Cannot call request_wait(...) with on_reply= or on_reply_failure="};
|
||||
|
||||
auto result = std::make_shared<std::promise<py::list>>();
|
||||
auto fut = result->get_future();
|
||||
std::function on_reply = [result](py::list value) {
|
||||
assert(len(value) == 0 || py::isinstance<py::memoryview>(value[0]));
|
||||
for (int i = len(value) - 1; i >= 0; i--)
|
||||
value[i] = value[i].attr("tobytes")();
|
||||
result->set_value(std::move(value));
|
||||
};
|
||||
std::function on_fail = [result](py::list value) {
|
||||
if (len(value) > 0 && (std::string) py::bytes(value[0]) == "TIMEOUT"sv) {
|
||||
auto msg = len(value) > 1 ? (std::string) py::bytes(value[1]) : "Request timed out"s;
|
||||
PyErr_SetString(PyExc_TimeoutError, msg.c_str());
|
||||
result->set_exception(std::make_exception_ptr(py::error_already_set{}));
|
||||
}
|
||||
|
||||
std::string err;
|
||||
for (auto& m : value) {
|
||||
if (!err.empty()) err += ", ";
|
||||
err += py::str(m);
|
||||
}
|
||||
result->set_exception(std::make_exception_ptr(std::runtime_error{"Request failed: " + err}));
|
||||
};
|
||||
|
||||
self.attr("request")(*args, **kwargs,
|
||||
"on_reply"_a = std::move(on_reply),
|
||||
"on_reply_failure"_a = std::move(on_fail));
|
||||
return fut;
|
||||
}, R"(Initiate a request with a future.
|
||||
|
||||
Initiates a request and returns a future that is used to check and wait for a response to the
|
||||
request. Takes the same arguments as .request(...), but without the `on_reply` and
|
||||
`on_reply_failure` options.
|
||||
|
||||
This can be used to make a synchronous request by simply calling .get() on the returned future:
|
||||
|
||||
try:
|
||||
response = omq.request_future(conn, "cat.cmd", "data").get()
|
||||
except TimeoutError as e:
|
||||
print("Request timed out!")
|
||||
except Exception:
|
||||
print("Request failed :(")
|
||||
|
||||
More powerfully, you can issue multiple, parallel requests storing the returned futures then .get()
|
||||
all of them to collect the responses.)");
|
||||
}
|
||||
|
||||
} // namespace oxenmq
|
|
@ -0,0 +1,126 @@
|
|||
|
||||
from oxenmq import OxenMQ, Message, LogLevel, AuthLevel, Address
|
||||
import random
|
||||
import string
|
||||
from datetime import datetime, timedelta
|
||||
import time
|
||||
import pytest
|
||||
|
||||
|
||||
def echo(m: Message):
|
||||
return 'Hi!', m.data()
|
||||
|
||||
|
||||
def ohce(m: Message):
|
||||
return [bytes(reversed(x)) for x in reversed(m.data())], '!iH'
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def zmq_address():
|
||||
import os
|
||||
sock = './' + ''.join(random.choices(string.ascii_letters, k=20)) + '.sock'
|
||||
addr = 'ipc://' + sock
|
||||
yield addr
|
||||
os.remove(sock)
|
||||
|
||||
|
||||
def make_omqs(zmq_addr, start=True):
|
||||
omq1 = OxenMQ(log_level=LogLevel.trace)
|
||||
|
||||
addr = Address(zmq_addr, omq1.pubkey)
|
||||
|
||||
omq1.listen(addr.zmq_address, curve=True)
|
||||
|
||||
omq1.add_category('cat', AuthLevel.none) \
|
||||
.add_request_command('echo', echo) \
|
||||
.add_request_command('ohce', ohce)
|
||||
|
||||
omq2 = OxenMQ(log_level=LogLevel.trace)
|
||||
|
||||
if start:
|
||||
omq1.start()
|
||||
omq2.start()
|
||||
|
||||
return omq1, omq2, addr
|
||||
|
||||
|
||||
def test_requests(zmq_address):
|
||||
omq1, omq2, addr = make_omqs(zmq_address)
|
||||
|
||||
reply1, reply2 = None, None
|
||||
|
||||
def on_reply1(r):
|
||||
nonlocal reply1
|
||||
reply1 = [x.tobytes().decode() for x in r]
|
||||
|
||||
def on_reply2(r):
|
||||
nonlocal reply2
|
||||
reply2 = [x.tobytes().decode() for x in r]
|
||||
|
||||
c1 = omq2.connect_remote(addr)
|
||||
omq2.request(c1, 'cat.echo', 'fee', 'fi', 'fo', 'fum', on_reply=on_reply1)
|
||||
omq2.request(c1, 'cat.ohce', 'fee', 'fi', 'fo', 'fum', on_reply=on_reply2)
|
||||
|
||||
timeout = datetime.now() + timedelta(seconds=0.5)
|
||||
while None in (reply1, reply2) and datetime.now() < timeout:
|
||||
time.sleep(0.01)
|
||||
|
||||
assert reply1 == ['Hi!', 'fee', 'fi', 'fo', 'fum']
|
||||
assert reply2 == ['muf', 'of', 'if', 'eef', '!iH']
|
||||
|
||||
|
||||
def test_request_future(zmq_address):
|
||||
omq1, omq2, addr = make_omqs(zmq_address)
|
||||
c1 = omq2.connect_remote(addr)
|
||||
|
||||
reply3 = [x.decode() for x in omq2.request_future(c1, 'cat.echo', 'xyz').get()]
|
||||
assert reply3 == ['Hi!', 'xyz']
|
||||
|
||||
|
||||
def test_commands(zmq_address):
|
||||
omq1, omq2, addr = make_omqs(zmq_address, start=False)
|
||||
|
||||
val1, val2, val3 = None, None, None
|
||||
defer = None
|
||||
|
||||
def cmd1(m):
|
||||
nonlocal val1
|
||||
val1 = ['CMD1 got'] + m.data()
|
||||
m.back("x.x", "asdf", b'\x00\x01\x02', "jkl;")
|
||||
|
||||
def cmd2(m):
|
||||
nonlocal val2, defer
|
||||
val2 = ['CMD2 got'] + m.data()
|
||||
defer = m.later()
|
||||
|
||||
def cmd_later(m):
|
||||
nonlocal defer, val3
|
||||
val3 = ['CMD-later got'] + m.data()
|
||||
|
||||
omq1.add_category("x", AuthLevel.none) \
|
||||
.add_command("y", cmd1) \
|
||||
.add_command("z", cmd_later)
|
||||
|
||||
omq2.add_category("x", AuthLevel.none) \
|
||||
.add_command("x", cmd2)
|
||||
|
||||
omq1.start()
|
||||
omq2.start()
|
||||
c1 = omq2.connect_remote(addr)
|
||||
omq2.send(c1, 'x.y', b'\0', 'M', 'G')
|
||||
|
||||
timeout = datetime.now() + timedelta(seconds=0.5)
|
||||
while None in (val1, val2) and datetime.now() < timeout:
|
||||
time.sleep(0.01)
|
||||
|
||||
assert val1 == ['CMD1 got', b'\0', b'M', b'G']
|
||||
assert val2 == ['CMD2 got', b'asdf', b'\0\1\2', b'jkl;']
|
||||
assert val3 is None
|
||||
assert defer is not None
|
||||
|
||||
defer.back("x.z", "cool")
|
||||
timeout = datetime.now() + timedelta(seconds=0.5)
|
||||
while val3 is None and datetime.now() < timeout:
|
||||
time.sleep(0.01)
|
||||
assert val3 == ['CMD-later got', b'cool']
|
||||
|
Loading…
Reference in New Issue