|
|
|
@ -622,28 +622,34 @@ permissions: in this example, the required permissions the access the endpoint w
|
|
|
|
|
OxenMQ::ConnectSuccess on_success,
|
|
|
|
|
OxenMQ::ConnectFailure on_failure,
|
|
|
|
|
std::chrono::milliseconds timeout,
|
|
|
|
|
std::optional<bool> ephemeral_routing_id) {
|
|
|
|
|
std::optional<bool> ephemeral_routing_id,
|
|
|
|
|
AuthLevel auth_level) {
|
|
|
|
|
|
|
|
|
|
return self.connect_remote(remote, std::move(on_success), std::move(on_failure),
|
|
|
|
|
connect_option::timeout{timeout},
|
|
|
|
|
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)}
|
|
|
|
|
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)},
|
|
|
|
|
auth_level
|
|
|
|
|
);
|
|
|
|
|
},
|
|
|
|
|
"remote"_a, "on_success"_a, "on_failure"_a,
|
|
|
|
|
kwonly,
|
|
|
|
|
"timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, "ephemeral_routing_id"_a = std::nullopt,
|
|
|
|
|
"timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT,
|
|
|
|
|
"ephemeral_routing_id"_a = std::nullopt,
|
|
|
|
|
"auth_level"_a = AuthLevel::none,
|
|
|
|
|
R"(
|
|
|
|
|
Starts connecting to a remote address and return immediately. The connection can be used
|
|
|
|
|
immediately, however messages will only be queued until the connection is established (or dropped if
|
|
|
|
|
the connection fails). The given callbacks are invoked for success or failure.
|
|
|
|
|
|
|
|
|
|
`ephemeral_routing_id` and `timeout` allowing overriding the defaults (oxenmq.EPHEMERAL_ROUTING_ID
|
|
|
|
|
and 10s, respectively).
|
|
|
|
|
and 10s, respectively). `auth_level` can be specified to set the auth level of *incoming* requests
|
|
|
|
|
that arrive through this connection.
|
|
|
|
|
)")
|
|
|
|
|
.def("connect_remote", [](OxenMQ& self,
|
|
|
|
|
const address& remote,
|
|
|
|
|
std::chrono::milliseconds timeout,
|
|
|
|
|
std::optional<bool> ephemeral_routing_id) {
|
|
|
|
|
std::optional<bool> ephemeral_routing_id,
|
|
|
|
|
AuthLevel auth_level) {
|
|
|
|
|
std::promise<ConnectionID> promise;
|
|
|
|
|
self.connect_remote(
|
|
|
|
|
remote,
|
|
|
|
@ -653,10 +659,16 @@ and 10s, respectively).
|
|
|
|
|
std::runtime_error{"Connection failed: " + std::string{reason}}));
|
|
|
|
|
},
|
|
|
|
|
oxenmq::connect_option::timeout{timeout},
|
|
|
|
|
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)}
|
|
|
|
|
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)},
|
|
|
|
|
auth_level
|
|
|
|
|
);
|
|
|
|
|
return promise.get_future().get();
|
|
|
|
|
}, "remote"_a, "timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT, "ephemeral_routing_id"_a = std::nullopt,
|
|
|
|
|
},
|
|
|
|
|
"remote"_a,
|
|
|
|
|
"timeout"_a = oxenmq::REMOTE_CONNECT_TIMEOUT,
|
|
|
|
|
kwonly,
|
|
|
|
|
"ephemeral_routing_id"_a = std::nullopt,
|
|
|
|
|
"auth_level"_a = AuthLevel::none,
|
|
|
|
|
R"(Simpler version of connect_remote that connects to a remote address synchronously.
|
|
|
|
|
|
|
|
|
|
This will block until the connection is established or times out; throws on connection failure,
|
|
|
|
@ -667,12 +679,14 @@ Takes the address and an optional `timeout` to override the timeout (default 10s
|
|
|
|
|
py::bytes pubkey,
|
|
|
|
|
std::optional<std::chrono::milliseconds> keep_alive,
|
|
|
|
|
std::optional<std::string> remote_hint,
|
|
|
|
|
std::optional<bool> ephemeral_routing_id) {
|
|
|
|
|
std::optional<bool> ephemeral_routing_id,
|
|
|
|
|
AuthLevel auth_level) {
|
|
|
|
|
return self.connect_sn(std::string{pubkey},
|
|
|
|
|
connect_option::keep_alive{keep_alive.value_or(-1ms)},
|
|
|
|
|
connect_option::hint{remote_hint.value_or("")},
|
|
|
|
|
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)});
|
|
|
|
|
}, "pubkey"_a, kwonly, "keep_alive"_a, "remote_hint"_a, "ephemeral_routing_id"_a,
|
|
|
|
|
connect_option::ephemeral_routing_id{ephemeral_routing_id.value_or(self.EPHEMERAL_ROUTING_ID)},
|
|
|
|
|
auth_level);
|
|
|
|
|
}, "pubkey"_a, kwonly, "keep_alive"_a, "remote_hint"_a, "ephemeral_routing_id"_a, "auth_level"_a = AuthLevel::none,
|
|
|
|
|
R"(Connect to a remote service node by pubkey.
|
|
|
|
|
|
|
|
|
|
Try to initiate a connection to the given SN in anticipation of needing a connection in the future.
|
|
|
|
@ -699,6 +713,9 @@ Parameters:
|
|
|
|
|
- ephemeral_routing_id - if set, override the default OxenMQ.EPHEMERAL_ROUTING_ID for this
|
|
|
|
|
connection.
|
|
|
|
|
|
|
|
|
|
- auth_level - specified the authentication level for incoming commands (i.e. issued *to us*) over
|
|
|
|
|
this connection.
|
|
|
|
|
|
|
|
|
|
Returns a ConnectionID that identifies an connection with the given SN. Typically you *don't* need
|
|
|
|
|
to worry about saving this (and can just discard it): you can always simply pass the pubkey into
|
|
|
|
|
send/request methods to send to the SN by pubkey.
|
|
|
|
@ -740,12 +757,12 @@ the background).)")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool request = kwargs.contains("request") && kwargs["request"].cast<bool>();
|
|
|
|
|
std::optional<py::function> on_reply, on_reply_failure;
|
|
|
|
|
std::unique_ptr<py::function> on_reply, on_reply_failure;
|
|
|
|
|
if (request) {
|
|
|
|
|
if (kwargs.contains("on_reply"))
|
|
|
|
|
on_reply = kwargs["on_reply"].cast<py::function>();
|
|
|
|
|
on_reply = std::make_unique<py::function>(kwargs["on_reply"].cast<py::function>());
|
|
|
|
|
if (kwargs.contains("on_reply_failure"))
|
|
|
|
|
on_reply_failure = kwargs["on_reply_failure"].cast<py::function>();
|
|
|
|
|
on_reply_failure = std::make_unique<py::function>(kwargs["on_reply_failure"].cast<py::function>());
|
|
|
|
|
} else if (kwargs.contains("on_reply") || kwargs.contains("on_reply_failure")) {
|
|
|
|
|
throw std::logic_error{"Error: send(...) on_reply=/on_reply_failure= option "
|
|
|
|
|
"requires request=True (perhaps you meant to use `.request(...)` instead?)"};
|
|
|
|
@ -778,29 +795,32 @@ the background).)")
|
|
|
|
|
hint, optional, incoming, outgoing, keep_alive, request_timeout,
|
|
|
|
|
std::move(qfail), std::move(qfull));
|
|
|
|
|
} else {
|
|
|
|
|
auto reply_cb = [on_reply = std::move(on_reply), on_fail = std::move(on_reply_failure)]
|
|
|
|
|
(bool success, std::vector<std::string> data) mutable {
|
|
|
|
|
auto reply_cb = [reply_rawptr = on_reply.release(), fail_rawptr = on_reply_failure.release()]
|
|
|
|
|
(bool success, std::vector<std::string> data) {
|
|
|
|
|
// The gil here makes things tricky: the function invocation itself is
|
|
|
|
|
// already gil protected, but the *destruction* of the lambda isn't, and
|
|
|
|
|
// that breaks things because the destruction frees a python reference to
|
|
|
|
|
// the callback. However oxenmq invokes this callback exactly once so we
|
|
|
|
|
// can deal with it by stealing the captures out of the lambda to force
|
|
|
|
|
// destruction here, with the gil held.
|
|
|
|
|
// can deal with it by leaking raw pointers into the lambda captures then
|
|
|
|
|
// reclaiming them the one and only time we are called.
|
|
|
|
|
py::gil_scoped_acquire gil;
|
|
|
|
|
auto reply = std::move(on_reply);
|
|
|
|
|
auto fail = std::move(on_fail);
|
|
|
|
|
std::unique_ptr<py::function> reply{reply_rawptr};
|
|
|
|
|
std::unique_ptr<py::function> fail{fail_rawptr};
|
|
|
|
|
|
|
|
|
|
if (success ? !reply : !fail)
|
|
|
|
|
return;
|
|
|
|
|
py::list l;
|
|
|
|
|
if (success) {
|
|
|
|
|
for (const auto& part : data)
|
|
|
|
|
l.append(py::memoryview::from_memory(part.data(), part.size()));
|
|
|
|
|
(*reply)(l);
|
|
|
|
|
} else if (on_fail) {
|
|
|
|
|
for (const auto& part : data)
|
|
|
|
|
l.append(py::bytes(part.data(), part.size()));
|
|
|
|
|
(*fail)(l);
|
|
|
|
|
if (reply) {
|
|
|
|
|
py::list l;
|
|
|
|
|
for (const auto& part : data)
|
|
|
|
|
l.append(py::memoryview::from_memory(part.data(), part.size()));
|
|
|
|
|
(*reply)(l);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (fail) {
|
|
|
|
|
py::list l;
|
|
|
|
|
for (const auto& part : data)
|
|
|
|
|
l.append(py::bytes(part.data(), part.size()));
|
|
|
|
|
(*fail)(l);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|