Various fixes for bugs revealed in testing

- make extract data parts always flatten to arbitrary depth.  We were
  only flattening once (and only sometimes), but arbitrary flattening is
  nicer (as long as we don't ultimately encouter anything that isn't
- fix pre-loading nulls into the list.  The
  py::list constructor argument here is pre-fill, not a reservation.
- fix add_command: it was broken because when the wrapped callback gets
  invoked it tries casting the Message via copying, but that breaks
  (Message isn't copyable).  Fix it by adding a wrapper that does a
  referencing cast to a Python object.
- Add missing docs for add_command
- Doc typo fixes
- Remove OxenMQ default and loglevel constructors, and make the
  remaining constructor take everything by keyword arguments.  (Default
  constructor remains equivalent, and the log-level constructor now has
  to have the log level named).
- Fix OxenMQ keyword constructor: invoking it was failing because the
  annotation default on sn_lookup wasn't properly castable to the
  required types (it should be py::none() rather than nullptr).  Also
  fix the defaults for pubkey/privkey to be py::bytes() (the empty
  string worked fine, but using bytes makes it show up as bytes defaults
  in the generated signature doc string).
- Further document connect_inproc with a description of why you might
  want it.
- Fix send not actually including the message data parts
- Work around segfault in the reply callback: because we have lambdas
  with python object captures, we'd segfault in the proxy thread when it
  frees them after calling them because that freeing invokes python
  destructor but it doesn't hold the gil.  Fixed by making the lambda
  consume its own callbacks (which is fine because oxenmq will invoke
  the callback exactly once).
- Fix request_future invocation which raised an error on invocation:
  straight lambdas aren't castable to python objects, so stuff the
  lambda inside std::functions, which are.
This commit is contained in:
Jason Rhinelander 2021-10-22 16:27:54 -03:00
parent 6a382865b2
commit 5ceed8a416

View file

@ -28,14 +28,8 @@ void extract_data_parts(std::vector<std::string>& parts, py::handle obj) {
else if (py::isinstance<py::str>(obj))
else if (py::isinstance<py::iterable>(obj)) {
for (auto o : obj) {
if (py::isinstance<py::bytes>(o))
else if (py::isinstance<py::str>(o))
throw std::runtime_error{"invalid iterable containing '" + std::string{py::repr(o)} + "': expected bytes/str"};
for (auto o : obj)
extract_data_parts(parts, o);
} else {
throw std::runtime_error{"invalid value '" + std::string{py::repr(obj)} + "': expected bytes/str/iterable"};
@ -45,12 +39,6 @@ std::vector<std::string> extract_data_parts(py::handle obj) {
extract_data_parts(parts, obj);
return parts;
std::vector<std::string> extract_data_parts(py::args& args) {
std::vector<std::string> data;
for (auto arg: args)
extract_data_parts(data, arg);
return data;
// Quick and dirty logger that logs to stderr. It would be much nicer to take a python function,
// but that deadlocks pretty much right away because of the crappiness of the gil.
@ -173,7 +161,7 @@ Typically the IP address string for TCP connections and "localhost:UID:GID:PID"
.def_readonly("access", &Message::access,
"The access level of the invoker (which can be higher than the access level required for the command category")
.def("dataview", [](const Message& m) {
py::list l{};
py::list l;
for (auto& part :
l.append(py::memoryview::from_memory(, part.size()));
return l;
@ -185,7 +173,7 @@ Message; if you need them beyond that then you must copy them (e.g. by calling m
or .to_bytes() on each one)"
.def("data", [](const Message& m) {
py::list l{};
py::list l;
for (auto& part :
l.append(py::bytes{, part.size()});
return l;
@ -259,7 +247,20 @@ instance is still alive).)")
py::class_<CatHelper>(mod, "Category",
"Helper class to add in registering category commands, returned from OxenMQ.add_category(...)")
.def("add_command", &CatHelper::add_command)
.def("add_command", [](CatHelper& cat, std::string name, py::function cb) {
return cat.add_command(name, [cb=std::move(cb)](Message& m) {
py::gil_scoped_acquire gil;
"name"_a, "callback"_a,
R"(Add a command handler to this category.
Adds a command, that is a command that is typically some sort of instruction that requires no reply.
(For a more typically request-response interface use .add_request_command instead).
The callback is passed a `Message` object containing details of the received message. Note that
this object must *not* be stored beyond the callback itself; see `Message` for details.)")
[](CatHelper& cat,
std::string name,
@ -283,14 +284,17 @@ instance is still alive).)")
return &cat;
"name"_a, "handler"_a,
R"(Add a request command to this category.
Adds a request command, that is, a command that is always expected to reply, to this category. The
callback must return one of:
callback is passed a Message object containing details of the required message and must return one
- None - no reply will be sent; typically returned because you sent it yourself (via
Message.send_reply()), or because you want to send it later via Message.send_later().
Message.reply()), or because you want to send it later via Message.later().
- bytes - will be sent as is in a single-part reply.
- str - will be sent in utf-8 encoding in a single-part reply.
- iterable object containing bytes and/or str elements: will be sent as a multi-part reply where
@ -306,12 +310,6 @@ callback itself.)")
py::class_<OxenMQ> oxenmq{mod, "OxenMQ"};
.def(py::init([](LogLevel level) {
// Quick and dirty logger that logs to stderr. It would be much nicer to take a python
// function, but that deadlocks pretty much right away because of the crappiness of the gil.
return std::make_unique<OxenMQ>(stderr_logger{}, level);
py::bytes pubkey,
py::bytes privkey,
@ -322,8 +320,9 @@ callback itself.)")
log_level ? OxenMQ::Logger{stderr_logger{}} : nullptr,
"pubkey"_a = "", "privkey"_a = "", "service_node"_a = false,
"sn_lookup"_a = nullptr, "log_level"_a = LogLevel::warn,
"pubkey"_a = py::bytes(), "privkey"_a = py::bytes(), "service_node"_a = false,
"sn_lookup"_a = py::none(), "log_level"_a = py::none(),
R"(OxenMQ constructor.
This constructs the object but does not start it; you will typically want to first add categories
@ -691,6 +690,9 @@ Connects to the built-in in-process listening socket of this OxenMQ server for l
Note that auth_level defaults to admin (unlike connect_remote), and the default timeout is much
This connection is designed to allow code within the same process to invoke registered commands via
the OxenMQ object. The connection works whether or not there are any accessible external listeners.
Also note that incoming inproc requests are unauthenticated: that is, they will always have
admin-level access.
@ -751,29 +753,38 @@ the background).)")
extract_data_parts(data, arg);
if (!request) {
self.send(std::get<ConnectionID>(conn), command,
self.send(std::get<ConnectionID>(conn), command, send_option::data_parts(data),
hint, optional, incoming, outgoing, keep_alive, request_timeout,
std::move(qfail), std::move(qfull));
} else {
auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)]
(bool success, std::vector<std::string> data) {
if (success ? !on_reply : !on_fail)
(bool success, std::vector<std::string> data) mutable {
// The gil here makes things tricky: the function invocation itself is
// already gil protected, but the *destruction* of the lambda isn't, and
// that breaks things because the destruction frees a python reference to
// the callback. However oxenmq invokes this callback exactly once so we
// can deal with it by stealing the captures out of the lambda to force
// destruction here, with the gil held.
py::gil_scoped_acquire gil;
auto reply = std::move(on_reply);
auto fail = std::move(on_fail);
if (success ? !reply : !fail)
py::list l;
if (success) {
for (const auto& part : data)
l.append(py::memoryview::from_memory(, part.size()));
} else if (on_fail) {
for (const auto& part : data)
l.append(py::bytes(, part.size()));
self.request(std::get<ConnectionID>(conn), command, std::move(reply_cb),
hint, optional, incoming, outgoing, keep_alive, request_timeout,
std::move(qfail), std::move(qfull));
@ -910,12 +921,13 @@ OxenMQ requests so that both are treated fairly in terms of processing priority.
auto result = std::make_shared<std::promise<py::list>>();
auto fut = result->get_future();
auto on_reply = [result](py::list value) {
std::function on_reply = [result](py::list value) {
assert(len(value) == 0 || py::isinstance<py::memoryview>(value[0]));
for (int i = len(value) - 1; i >= 0; i--)
value[i] = value[i].attr("tobytes")();
auto on_fail = [result](py::list value) {
std::function on_fail = [result](py::list value) {
if (len(value) > 0 && (std::string) py::bytes(value[0]) == "TIMEOUT"sv) {
auto msg = len(value) > 1 ? (std::string) py::bytes(value[1]) : "Request timed out"s;
PyErr_SetString(PyExc_TimeoutError, msg.c_str());