From 5ceed8a416534b88c0456175f132ea38482eddd5 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 22 Oct 2021 16:27:54 -0300 Subject: [PATCH] Various fixes for bugs revealed in testing - make extract data parts always flatten to arbitrary depth. We were only flattening once (and only sometimes), but arbitrary flattening is nicer (as long as we don't ultimately encouter anything that isn't str/bytes). - fix Message.data()/.dataview() pre-loading nulls into the list. The py::list constructor argument here is pre-fill, not a reservation. - fix add_command: it was broken because when the wrapped callback gets invoked it tries casting the Message via copying, but that breaks (Message isn't copyable). Fix it by adding a wrapper that does a referencing cast to a Python object. - Add missing docs for add_command - Doc typo fixes - Remove OxenMQ default and loglevel constructors, and make the remaining constructor take everything by keyword arguments. (Default constructor remains equivalent, and the log-level constructor now has to have the log level named). - Fix OxenMQ keyword constructor: invoking it was failing because the annotation default on sn_lookup wasn't properly castable to the required types (it should be py::none() rather than nullptr). Also fix the defaults for pubkey/privkey to be py::bytes() (the empty string worked fine, but using bytes makes it show up as bytes defaults in the generated signature doc string). - Further document connect_inproc with a description of why you might want it. - Fix send not actually including the message data parts - Work around segfault in the reply callback: because we have lambdas with python object captures, we'd segfault in the proxy thread when it frees them after calling them because that freeing invokes python destructor but it doesn't hold the gil. Fixed by making the lambda consume its own callbacks (which is fine because oxenmq will invoke the callback exactly once). - Fix request_future invocation which raised an error on invocation: straight lambdas aren't castable to python objects, so stuff the lambda inside std::functions, which are. --- src/oxenmq.cpp | 84 ++++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/src/oxenmq.cpp b/src/oxenmq.cpp index bf776b2..0e21e2c 100644 --- a/src/oxenmq.cpp +++ b/src/oxenmq.cpp @@ -28,14 +28,8 @@ void extract_data_parts(std::vector& parts, py::handle obj) { else if (py::isinstance(obj)) parts.push_back(obj.cast()); else if (py::isinstance(obj)) { - for (auto o : obj) { - if (py::isinstance(o)) - parts.push_back(o.cast()); - else if (py::isinstance(o)) - parts.push_back(o.cast()); - else - throw std::runtime_error{"invalid iterable containing '" + std::string{py::repr(o)} + "': expected bytes/str"}; - } + for (auto o : obj) + extract_data_parts(parts, o); } else { throw std::runtime_error{"invalid value '" + std::string{py::repr(obj)} + "': expected bytes/str/iterable"}; } @@ -45,12 +39,6 @@ std::vector extract_data_parts(py::handle obj) { extract_data_parts(parts, obj); return parts; } -std::vector extract_data_parts(py::args& args) { - std::vector data; - for (auto arg: args) - extract_data_parts(data, arg); - return data; -} // Quick and dirty logger that logs to stderr. It would be much nicer to take a python function, // but that deadlocks pretty much right away because of the crappiness of the gil. @@ -173,7 +161,7 @@ Typically the IP address string for TCP connections and "localhost:UID:GID:PID" .def_readonly("access", &Message::access, "The access level of the invoker (which can be higher than the access level required for the command category") .def("dataview", [](const Message& m) { - py::list l{m.data.size()}; + py::list l; for (auto& part : m.data) l.append(py::memoryview::from_memory(part.data(), part.size())); return l; @@ -185,7 +173,7 @@ Message; if you need them beyond that then you must copy them (e.g. by calling m or .to_bytes() on each one)" ) .def("data", [](const Message& m) { - py::list l{m.data.size()}; + py::list l; for (auto& part : m.data) l.append(py::bytes{part.data(), part.size()}); return l; @@ -259,7 +247,20 @@ instance is still alive).)") py::class_(mod, "Category", "Helper class to add in registering category commands, returned from OxenMQ.add_category(...)") - .def("add_command", &CatHelper::add_command) + .def("add_command", [](CatHelper& cat, std::string name, py::function cb) { + return cat.add_command(name, [cb=std::move(cb)](Message& m) { + py::gil_scoped_acquire gil; + cb(&m); + }); + }, + "name"_a, "callback"_a, + R"(Add a command handler to this category. + +Adds a command, that is a command that is typically some sort of instruction that requires no reply. +(For a more typically request-response interface use .add_request_command instead). + +The callback is passed a `Message` object containing details of the received message. Note that +this object must *not* be stored beyond the callback itself; see `Message` for details.)") .def("add_request_command", [](CatHelper& cat, std::string name, @@ -283,14 +284,17 @@ instance is still alive).)") } msg.send_reply(send_option::data_parts(result)); }); + return &cat; }, + "name"_a, "handler"_a, R"(Add a request command to this category. Adds a request command, that is, a command that is always expected to reply, to this category. The -callback must return one of: +callback is passed a Message object containing details of the required message and must return one +of: - None - no reply will be sent; typically returned because you sent it yourself (via - Message.send_reply()), or because you want to send it later via Message.send_later(). + Message.reply()), or because you want to send it later via Message.later(). - bytes - will be sent as is in a single-part reply. - str - will be sent in utf-8 encoding in a single-part reply. - iterable object containing bytes and/or str elements: will be sent as a multi-part reply where @@ -306,12 +310,6 @@ callback itself.)") py::class_ oxenmq{mod, "OxenMQ"}; oxenmq - .def(py::init<>()) - .def(py::init([](LogLevel level) { - // Quick and dirty logger that logs to stderr. It would be much nicer to take a python - // function, but that deadlocks pretty much right away because of the crappiness of the gil. - return std::make_unique(stderr_logger{}, level); - })) .def(py::init([]( py::bytes pubkey, py::bytes privkey, @@ -322,8 +320,9 @@ callback itself.)") log_level ? OxenMQ::Logger{stderr_logger{}} : nullptr, log_level.value_or(LogLevel::warn)); }), - "pubkey"_a = "", "privkey"_a = "", "service_node"_a = false, - "sn_lookup"_a = nullptr, "log_level"_a = LogLevel::warn, + kwonly, + "pubkey"_a = py::bytes(), "privkey"_a = py::bytes(), "service_node"_a = false, + "sn_lookup"_a = py::none(), "log_level"_a = py::none(), R"(OxenMQ constructor. This constructs the object but does not start it; you will typically want to first add categories @@ -691,6 +690,9 @@ Connects to the built-in in-process listening socket of this OxenMQ server for l Note that auth_level defaults to admin (unlike connect_remote), and the default timeout is much shorter. +This connection is designed to allow code within the same process to invoke registered commands via +the OxenMQ object. The connection works whether or not there are any accessible external listeners. + Also note that incoming inproc requests are unauthenticated: that is, they will always have admin-level access. )") @@ -751,29 +753,38 @@ the background).)") extract_data_parts(data, arg); if (!request) { - self.send(std::get(conn), command, + self.send(std::get(conn), command, send_option::data_parts(data), hint, optional, incoming, outgoing, keep_alive, request_timeout, std::move(qfail), std::move(qfull)); } else { auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)] - (bool success, std::vector data) { - - if (success ? !on_reply : !on_fail) - return; + (bool success, std::vector data) mutable { + // The gil here makes things tricky: the function invocation itself is + // already gil protected, but the *destruction* of the lambda isn't, and + // that breaks things because the destruction frees a python reference to + // the callback. However oxenmq invokes this callback exactly once so we + // can deal with it by stealing the captures out of the lambda to force + // destruction here, with the gil held. py::gil_scoped_acquire gil; + auto reply = std::move(on_reply); + auto fail = std::move(on_fail); + + if (success ? !reply : !fail) + return; py::list l; if (success) { for (const auto& part : data) l.append(py::memoryview::from_memory(part.data(), part.size())); - (*on_reply)(l); + (*reply)(l); } else if (on_fail) { for (const auto& part : data) l.append(py::bytes(part.data(), part.size())); - (*on_fail)(l); + (*fail)(l); } }; self.request(std::get(conn), command, std::move(reply_cb), + send_option::data_parts(data), hint, optional, incoming, outgoing, keep_alive, request_timeout, std::move(qfail), std::move(qfull)); } @@ -910,12 +921,13 @@ OxenMQ requests so that both are treated fairly in terms of processing priority. auto result = std::make_shared>(); auto fut = result->get_future(); - auto on_reply = [result](py::list value) { + std::function on_reply = [result](py::list value) { assert(len(value) == 0 || py::isinstance(value[0])); for (int i = len(value) - 1; i >= 0; i--) value[i] = value[i].attr("tobytes")(); + result->set_value(std::move(value)); }; - auto on_fail = [result](py::list value) { + std::function on_fail = [result](py::list value) { if (len(value) > 0 && (std::string) py::bytes(value[0]) == "TIMEOUT"sv) { auto msg = len(value) > 1 ? (std::string) py::bytes(value[1]) : "Request timed out"s; PyErr_SetString(PyExc_TimeoutError, msg.c_str());