mirror of
https://github.com/oxen-io/oxen-pyoxenmq.git
synced 2023-12-13 20:50:34 +01:00
Merge remote-tracking branch 'origin/stable' into debian/sid
This commit is contained in:
commit
eb07d4c9b9
2 changed files with 23 additions and 20 deletions
2
setup.py
2
setup.py
|
@ -3,7 +3,7 @@ from setuptools import setup
|
||||||
# Available at setup time due to pyproject.toml
|
# Available at setup time due to pyproject.toml
|
||||||
from pybind11.setup_helpers import Pybind11Extension, build_ext
|
from pybind11.setup_helpers import Pybind11Extension, build_ext
|
||||||
|
|
||||||
__version__ = "1.0.4"
|
__version__ = "1.0.5"
|
||||||
|
|
||||||
# Note:
|
# Note:
|
||||||
# Sort input source files if you glob sources to ensure bit-for-bit
|
# Sort input source files if you glob sources to ensure bit-for-bit
|
||||||
|
|
|
@ -757,12 +757,12 @@ the background).)")
|
||||||
}
|
}
|
||||||
|
|
||||||
bool request = kwargs.contains("request") && kwargs["request"].cast<bool>();
|
bool request = kwargs.contains("request") && kwargs["request"].cast<bool>();
|
||||||
std::optional<py::function> on_reply, on_reply_failure;
|
std::unique_ptr<py::function> on_reply, on_reply_failure;
|
||||||
if (request) {
|
if (request) {
|
||||||
if (kwargs.contains("on_reply"))
|
if (kwargs.contains("on_reply"))
|
||||||
on_reply = kwargs["on_reply"].cast<py::function>();
|
on_reply = std::make_unique<py::function>(kwargs["on_reply"].cast<py::function>());
|
||||||
if (kwargs.contains("on_reply_failure"))
|
if (kwargs.contains("on_reply_failure"))
|
||||||
on_reply_failure = kwargs["on_reply_failure"].cast<py::function>();
|
on_reply_failure = std::make_unique<py::function>(kwargs["on_reply_failure"].cast<py::function>());
|
||||||
} else if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure")) {
|
} else if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure")) {
|
||||||
throw std::logic_error{"Error: send(...) on_reply=/on_reply_failure= option "
|
throw std::logic_error{"Error: send(...) on_reply=/on_reply_failure= option "
|
||||||
"requires request=True (perhaps you meant to use `.request(...)` instead?)"};
|
"requires request=True (perhaps you meant to use `.request(...)` instead?)"};
|
||||||
|
@ -795,29 +795,32 @@ the background).)")
|
||||||
hint, optional, incoming, outgoing, keep_alive, request_timeout,
|
hint, optional, incoming, outgoing, keep_alive, request_timeout,
|
||||||
std::move(qfail), std::move(qfull));
|
std::move(qfail), std::move(qfull));
|
||||||
} else {
|
} else {
|
||||||
auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)]
|
auto reply_cb = [reply_rawptr = on_reply.release(), fail_rawptr = on_reply_failure.release()]
|
||||||
(bool success, std::vector<std::string> data) mutable {
|
(bool success, std::vector<std::string> data) {
|
||||||
// The gil here makes things tricky: the function invocation itself is
|
// The gil here makes things tricky: the function invocation itself is
|
||||||
// already gil protected, but the *destruction* of the lambda isn't, and
|
// already gil protected, but the *destruction* of the lambda isn't, and
|
||||||
// that breaks things because the destruction frees a python reference to
|
// that breaks things because the destruction frees a python reference to
|
||||||
// the callback. However oxenmq invokes this callback exactly once so we
|
// the callback. However oxenmq invokes this callback exactly once so we
|
||||||
// can deal with it by stealing the captures out of the lambda to force
|
// can deal with it by leaking raw pointers into the lambda captures then
|
||||||
// destruction here, with the gil held.
|
// reclaiming them the one and only time we are called.
|
||||||
py::gil_scoped_acquire gil;
|
py::gil_scoped_acquire gil;
|
||||||
auto reply = std::move(on_reply);
|
std::unique_ptr<py::function> reply{reply_rawptr};
|
||||||
auto fail = std::move(on_fail);
|
std::unique_ptr<py::function> fail{fail_rawptr};
|
||||||
|
|
||||||
if (success ? !reply : !fail)
|
|
||||||
return;
|
|
||||||
py::list l;
|
|
||||||
if (success) {
|
if (success) {
|
||||||
for (const auto& part : data)
|
if (reply) {
|
||||||
l.append(py::memoryview::from_memory(part.data(), part.size()));
|
py::list l;
|
||||||
(*reply)(l);
|
for (const auto& part : data)
|
||||||
} else if (on_fail) {
|
l.append(py::memoryview::from_memory(part.data(), part.size()));
|
||||||
for (const auto& part : data)
|
(*reply)(l);
|
||||||
l.append(py::bytes(part.data(), part.size()));
|
}
|
||||||
(*fail)(l);
|
} else {
|
||||||
|
if (fail) {
|
||||||
|
py::list l;
|
||||||
|
for (const auto& part : data)
|
||||||
|
l.append(py::bytes(part.data(), part.size()));
|
||||||
|
(*fail)(l);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue