mirror of https://github.com/oxen-io/oxen-mq.git
Take lokimq::address as connect_remote argument
Deprecates the existing connect_remote() that takes remote addr and pubkey as separate strings, just taking a `address` instead (into which the caller can set pubkey/curve data as desired). Also slightly changes how `connect_remote()` works when called with a string remote but no pubkey: that string is now an augmented lokimq::address string so that it can use the various formats supported by `lokimq::address`. (This was meant to be included in the PR that added `address` but apparently didn't get implemented.)
This commit is contained in:
parent
8a56b18cc6
commit
07b31bd8a1
|
@ -181,6 +181,20 @@ address::address(std::string_view addr) {
|
|||
throw std::invalid_argument{"Invalid trailing garbage '" + std::string{addr} + "' in address"};
|
||||
}
|
||||
|
||||
address& address::set_pubkey(std::string_view pk) {
|
||||
if (pk.size() == 0) {
|
||||
if (protocol == proto::tcp_curve) protocol = proto::tcp;
|
||||
else if (protocol == proto::ipc_curve) protocol = proto::ipc;
|
||||
} else if (pk.size() == 32) {
|
||||
if (protocol == proto::tcp) protocol = proto::tcp_curve;
|
||||
else if (protocol == proto::ipc) protocol = proto::ipc_curve;
|
||||
} else {
|
||||
throw std::invalid_argument{"Invalid pubkey passed to set_pubkey(): require 0- or 32-byte pubkey"};
|
||||
}
|
||||
pubkey = pk;
|
||||
return *this;
|
||||
}
|
||||
|
||||
std::string address::encode_pubkey(encoding enc) const {
|
||||
std::string pk;
|
||||
if (enc == encoding::hex)
|
||||
|
|
|
@ -120,6 +120,26 @@ struct address {
|
|||
*/
|
||||
address(std::string_view addr);
|
||||
|
||||
/** Constructs an address from a remote string and a separate pubkey. Typically `remote` is a
|
||||
* basic ZMQ connect string, though this is not enforced. Any pubkey information embedded in
|
||||
* the remote string will be discarded and replaced with the given pubkey string. The result
|
||||
* will be curve encrypted if `pubkey` is non-empty, plaintext if `pubkey` is empty.
|
||||
*
|
||||
* Throws an exception if either addr or pubkey is invalid.
|
||||
*
|
||||
* Exactly equivalent to `address a{remote}; a.set_pubkey(pubkey);`
|
||||
*/
|
||||
address(std::string_view addr, std::string_view pubkey) : address(addr) { set_pubkey(pubkey); }
|
||||
|
||||
/// Replaces the address's pubkey (if any) with the given pubkey (or no pubkey if empty). If
|
||||
/// changing from pubkey to no-pubkey or no-pubkey to pubkey then the protocol is update to
|
||||
/// switch to or from curve encryption.
|
||||
///
|
||||
/// pubkey should be the 32-byte binary pubkey, or an empty string to remove an existing pubkey.
|
||||
///
|
||||
/// Returns the object itself, so that you can chain it.
|
||||
address& set_pubkey(std::string_view pubkey);
|
||||
|
||||
/// Constructs and builds the ZMQ connection address from the stored connection details. This
|
||||
/// does not contain any of the curve-related details; those must be specified separately when
|
||||
/// interfacing with ZMQ.
|
||||
|
|
|
@ -76,30 +76,32 @@ ConnectionID LokiMQ::connect_sn(std::string_view pubkey, std::chrono::millisecon
|
|||
return pubkey;
|
||||
}
|
||||
|
||||
ConnectionID LokiMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||
std::string_view pubkey, AuthLevel auth_level, std::chrono::milliseconds timeout) {
|
||||
ConnectionID LokiMQ::connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||
AuthLevel auth_level, std::chrono::milliseconds timeout) {
|
||||
if (!proxy_thread.joinable())
|
||||
throw std::logic_error("Cannot call connect_remote() before calling `start()`");
|
||||
|
||||
if (remote.size() < 7 || !(remote.substr(0, 6) == "tcp://" || remote.substr(0, 6) == "ipc://" /* unix domain sockets */))
|
||||
throw std::runtime_error("Invalid connect_remote: remote address '" + std::string{remote} + "' is not a valid or supported zmq connect string");
|
||||
|
||||
auto id = next_conn_id++;
|
||||
LMQ_TRACE("telling proxy to connect to ", remote, ", id ", id,
|
||||
pubkey.empty() ? "using NULL auth" : ", using CURVE with remote pubkey [" + to_hex(pubkey) + "]");
|
||||
LMQ_TRACE("telling proxy to connect to ", remote, ", id ", id);
|
||||
detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize<bt_dict>({
|
||||
{"auth_level", static_cast<std::underlying_type_t<AuthLevel>>(auth_level)},
|
||||
{"conn_id", id},
|
||||
{"connect", detail::serialize_object(std::move(on_connect))},
|
||||
{"failure", detail::serialize_object(std::move(on_failure))},
|
||||
{"pubkey", pubkey},
|
||||
{"remote", remote},
|
||||
{"pubkey", remote.curve() ? remote.pubkey : ""},
|
||||
{"remote", remote.zmq_address()},
|
||||
{"timeout", timeout.count()},
|
||||
}));
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
ConnectionID LokiMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||
std::string_view pubkey, AuthLevel auth_level, std::chrono::milliseconds timeout) {
|
||||
return connect_remote(address{remote}.set_pubkey(pubkey),
|
||||
std::move(on_connect), std::move(on_failure), auth_level, timeout);
|
||||
}
|
||||
|
||||
void LokiMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
|
||||
detail::send_control(get_control_socket(), "DISCONNECT", bt_serialize<bt_dict>({
|
||||
{"conn_id", id.id},
|
||||
|
|
|
@ -44,6 +44,7 @@
|
|||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include "zmq.hpp"
|
||||
#include "address.h"
|
||||
#include "bt_serialize.h"
|
||||
#include "connections.h"
|
||||
#include "message.h"
|
||||
|
@ -967,12 +968,11 @@ public:
|
|||
* The `on_connect` and `on_failure` callbacks are invoked when a connection has been
|
||||
* established or failed to establish.
|
||||
*
|
||||
* @param remote the remote connection address, such as `tcp://localhost:1234`.
|
||||
* @param remote the remote connection address either as implicitly from a string or as a full
|
||||
* lokimq::address object; see address.h for details. This specifies both the connection
|
||||
* address and whether curve encryption should be used.
|
||||
* @param on_connect called with the identifier after the connection has been established.
|
||||
* @param on_failure called with the identifier and failure message if we fail to connect.
|
||||
* @param pubkey if non-empty then connect securely (using curve encryption) and verify that the
|
||||
* remote's pubkey equals the given value. Specifying this is similar to using connect_sn()
|
||||
* except that we do not treat the remote as a SN for command authorization purposes.
|
||||
* @param auth_level determines the authentication level of the remote for issuing commands to
|
||||
* us. The default is `AuthLevel::none`.
|
||||
* @param timeout how long to try before aborting the connection attempt and calling the
|
||||
|
@ -982,8 +982,23 @@ public:
|
|||
* @param returns ConnectionID that uniquely identifies the connection to this remote node. In
|
||||
* order to talk to it you will need the returned value (or a copy of it).
|
||||
*/
|
||||
ConnectionID connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||
AuthLevel auth_level = AuthLevel::none, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
|
||||
|
||||
/// Same as the above, but takes the address as a string_view and constructs an `address` from
|
||||
/// it.
|
||||
ConnectionID connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||
std::string_view pubkey = {},
|
||||
AuthLevel auth_level = AuthLevel::none, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT) {
|
||||
return connect_remote(address{remote}, std::move(on_connect), std::move(on_failure), auth_level, timeout);
|
||||
}
|
||||
|
||||
/// Deprecated version of the above that takes the remote address and remote pubkey for curve
|
||||
/// encryption as separate arguments. New code should either use a pubkey-embedded address
|
||||
/// string, or specify remote address and pubkey with an `address` object such as:
|
||||
/// connect_remote(address{remote, pubkey}, ...)
|
||||
[[deprecated("use connect_remote() with a lokimq::address instead")]]
|
||||
ConnectionID connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||
std::string_view pubkey,
|
||||
AuthLevel auth_level = AuthLevel::none,
|
||||
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
|
||||
|
||||
|
|
|
@ -41,10 +41,9 @@ TEST_CASE("basic commands", "[commands]") {
|
|||
bool success = false, failed = false;
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(listen,
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); success = true; got = true; },
|
||||
[&](auto conn, std::string_view) { failed = true; got = true; },
|
||||
server.get_pubkey());
|
||||
[&](auto conn, std::string_view) { failed = true; got = true; });
|
||||
|
||||
wait_for_conn(got);
|
||||
{
|
||||
|
@ -107,9 +106,10 @@ TEST_CASE("outgoing auth level", "[commands][auth]") {
|
|||
|
||||
client.PUBKEY_BASED_ROUTING_ID = false; // establishing multiple connections below, so we need unique routing ids
|
||||
|
||||
auto public_c = client.connect_remote(listen, [](auto&&...) {}, [](auto&&...) {}, server.get_pubkey());
|
||||
auto basic_c = client.connect_remote(listen, [](auto&&...) {}, [](auto&&...) {}, server.get_pubkey(), AuthLevel::basic);
|
||||
auto admin_c = client.connect_remote(listen, [](auto&&...) {}, [](auto&&...) {}, server.get_pubkey(), AuthLevel::admin);
|
||||
address server_addr{listen, server.get_pubkey()};
|
||||
auto public_c = client.connect_remote(server_addr, [](auto&&...) {}, [](auto&&...) {});
|
||||
auto basic_c = client.connect_remote(server_addr, [](auto&&...) {}, [](auto&&...) {}, AuthLevel::basic);
|
||||
auto admin_c = client.connect_remote(server_addr, [](auto&&...) {}, [](auto&&...) {}, AuthLevel::admin);
|
||||
|
||||
client.send(public_c, "public.reflect", "public.hi");
|
||||
wait_for([&] { return public_hi == 1; });
|
||||
|
@ -206,7 +206,7 @@ TEST_CASE("deferred replies on incoming connections", "[commands][hey google]")
|
|||
backdoor_details.emplace(m.data[0]);
|
||||
});
|
||||
nsa.start();
|
||||
auto nsa_c = nsa.connect_remote(listen, connect_success, connect_failure, server.get_pubkey(), AuthLevel::admin);
|
||||
auto nsa_c = nsa.connect_remote(address{listen, server.get_pubkey()}, connect_success, connect_failure, AuthLevel::admin);
|
||||
nsa.send(nsa_c, "hey google.install backdoor");
|
||||
|
||||
wait_for([&] { auto lock = catch_lock(); return (bool) backdoor; });
|
||||
|
@ -227,6 +227,7 @@ TEST_CASE("deferred replies on incoming connections", "[commands][hey google]")
|
|||
std::set<std::string> all_the_things;
|
||||
for (auto& pd : personal_details) all_the_things.insert(pd.second.begin(), pd.second.end());
|
||||
|
||||
address server_addr{listen, server.get_pubkey()};
|
||||
std::map<int, std::set<std::string>> google_knows;
|
||||
int things_remembered{0};
|
||||
for (int i = 0; i < 5; i++) {
|
||||
|
@ -241,7 +242,7 @@ TEST_CASE("deferred replies on incoming connections", "[commands][hey google]")
|
|||
});
|
||||
c->start();
|
||||
conns.push_back(
|
||||
c->connect_remote(listen, connect_success, connect_failure, server.get_pubkey(), AuthLevel::basic));
|
||||
c->connect_remote(server_addr, connect_success, connect_failure, AuthLevel::basic));
|
||||
for (auto& personal_detail : personal_details[i])
|
||||
c->request(conns.back(), "hey google.remember",
|
||||
[&](bool success, std::vector<std::string> data) {
|
||||
|
|
|
@ -27,10 +27,9 @@ TEST_CASE("connections with curve authentication", "[curve][connect]") {
|
|||
auto pubkey = server.get_pubkey();
|
||||
std::atomic<bool> got{false};
|
||||
bool success = false;
|
||||
auto server_conn = client.connect_remote(listen,
|
||||
auto server_conn = client.connect_remote(address{listen, pubkey},
|
||||
[&](auto conn) { success = true; got = true; },
|
||||
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; },
|
||||
pubkey);
|
||||
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; });
|
||||
|
||||
wait_for_conn(got);
|
||||
{
|
||||
|
|
|
@ -30,10 +30,9 @@ TEST_CASE("basic requests", "[requests]") {
|
|||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(listen,
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; },
|
||||
server.get_pubkey());
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
wait_for([&] { return connected || failed; });
|
||||
{
|
||||
|
@ -88,10 +87,9 @@ TEST_CASE("request from server to client", "[requests]") {
|
|||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(listen,
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; },
|
||||
server.get_pubkey());
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
int i;
|
||||
for (i = 0; i < 5; i++) {
|
||||
|
@ -151,10 +149,9 @@ TEST_CASE("request timeouts", "[requests][timeout]") {
|
|||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(listen,
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; },
|
||||
server.get_pubkey());
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
wait_for([&] { return connected || failed; });
|
||||
|
||||
|
|
Loading…
Reference in New Issue