Rename PUBKEY_BASED_ROUTING_ID to EPHEMERAL_ROUTING_ID

And similarly for the connect_option
This commit is contained in:
Jason Rhinelander 2021-04-15 15:42:04 -03:00
parent 99a3f1d840
commit ac58e5b574
5 changed files with 45 additions and 46 deletions

View File

@ -47,7 +47,7 @@ void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
}
}
void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_pubkey_routing_id) {
void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_ephemeral_routing_id) {
setup_external_socket(socket);
@ -57,7 +57,7 @@ void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
socket.set(zmq::sockopt::curve_secretkey, privkey);
}
if (use_pubkey_routing_id) {
if (!use_ephemeral_routing_id) {
std::string routing_id;
routing_id.reserve(33);
routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0)
@ -90,7 +90,7 @@ void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
}
std::pair<zmq::socket_t *, std::string>
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, bool use_pubkey_routing_id, std::chrono::milliseconds keep_alive) {
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, bool use_ephemeral_routing_id, std::chrono::milliseconds keep_alive) {
ConnectionID remote_cid{remote};
auto its = peers.equal_range(remote_cid);
peer_info* peer = nullptr;
@ -142,7 +142,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote));
zmq::socket_t socket{context, zmq::socket_type::dealer};
setup_outgoing_socket(socket, remote, use_pubkey_routing_id);
setup_outgoing_socket(socket, remote, use_ephemeral_routing_id);
try {
socket.connect(addr);
} catch (const zmq::error_t& e) {
@ -168,9 +168,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consumer data) {
std::string_view hint, remote_pk;
std::chrono::milliseconds keep_alive;
bool optional = false, incoming_only = false, outgoing_only = false, pubkey_routing = PUBKEY_BASED_ROUTING_ID;
bool optional = false, incoming_only = false, outgoing_only = false, ephemeral_rid = EPHEMERAL_ROUTING_ID;
// Alphabetical order
if (data.skip_until("ephemeral_rid"))
ephemeral_rid = data.consume_integer<bool>();
if (data.skip_until("hint"))
hint = data.consume_string_view();
if (data.skip_until("incoming"))
@ -184,10 +186,8 @@ std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consume
if (!data.skip_until("pubkey"))
throw std::runtime_error("Internal error: Invalid proxy_connect_sn command; pubkey missing");
remote_pk = data.consume_string_view();
if (data.skip_until("pubkey_routing"))
pubkey_routing = data.consume_integer<bool>();
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, pubkey_routing, keep_alive);
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, ephemeral_rid, keep_alive);
}
template <typename Container, typename AccessIndex>
@ -296,24 +296,22 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
std::string remote;
std::string remote_pubkey;
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT;
bool pubkey_routing = PUBKEY_BASED_ROUTING_ID;
bool ephemeral_rid = EPHEMERAL_ROUTING_ID;
if (data.skip_until("auth_level"))
auth_level = static_cast<AuthLevel>(data.consume_integer<std::underlying_type_t<AuthLevel>>());
if (data.skip_until("conn_id"))
conn_id = data.consume_integer<long long>();
if (data.skip_until("connect")) {
if (data.skip_until("connect"))
on_connect = detail::deserialize_object<ConnectSuccess>(data.consume_integer<uintptr_t>());
}
if (data.skip_until("failure")) {
if (data.skip_until("ephemeral_rid"))
ephemeral_rid = data.consume_integer<bool>();
if (data.skip_until("failure"))
on_failure = detail::deserialize_object<ConnectFailure>(data.consume_integer<uintptr_t>());
}
if (data.skip_until("pubkey")) {
remote_pubkey = data.consume_string();
assert(remote_pubkey.size() == 32 || remote_pubkey.empty());
}
if (data.skip_until("pubkey_routing"))
pubkey_routing = data.consume_integer<bool>();
if (data.skip_until("remote"))
remote = data.consume_string();
if (data.skip_until("timeout"))
@ -328,7 +326,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
zmq::socket_t sock{context, zmq::socket_type::dealer};
try {
setup_outgoing_socket(sock, remote_pubkey, pubkey_routing);
setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid);
sock.connect(remote);
} catch (const zmq::error_t &e) {
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {

View File

@ -198,21 +198,21 @@ public:
* closing the connection. Setting this only affects new outgoing connections. */
std::chrono::milliseconds HANDSHAKE_TIME = 10s;
/** Whether to use a zmq routing ID based on the pubkey for new outgoing connections. This is
* desirable when connections between endpoints are unique as it allows the listener to
* recognize that the incoming connection is a reconnection from the same remote and handover
* routing to the new socket while closing off the (likely dead) old socket. This, however,
* prevents a single OxenMQ instance (or multiple OxenMQ instances using the same keys) from
* establishing multiple connections to the same listening OxenMQ, which is sometimes useful
* (for example when testing, or when sharing an authentication key), and so this option can be
* overridden to `false` to use completely random zmq routing ids on outgoing connections (which
* will thus allow multiple connections).
/** Whether to use a random zmq routing ID, or one based on the pubkey for new outgoing
* connections. Using the pubkey is desirable when connections between endpoints are unique as
* it allows the listener to recognize that the incoming connection is a reconnection from the
* same remote and handover routing to the new socket while closing off the (likely dead) old
* socket. This, however, prevents a single OxenMQ instance (or multiple OxenMQ instances using
* the same keys) from establishing multiple connections to the same listening OxenMQ, which is
* sometimes useful (for example when testing, or when sharing an authentication key), and so
* this option can be overridden to `true` to use completely random zmq routing ids on outgoing
* connections (which will thus allow multiple connections).
*
* Note that this only affects the default for outgoing connections: you can override an
* individual connection by passing a connect_option::pubkey_routing option into the
* individual connection by passing a connect_option::ephemeral_routing_id option into the
* connect_sn/connect_remote method.
*/
bool PUBKEY_BASED_ROUTING_ID = true;
bool EPHEMERAL_ROUTING_ID = false;
/** Maximum incoming message size; if a remote tries sending a message larger than this they get
* disconnected. -1 means no limit. */
@ -498,7 +498,7 @@ private:
// provided then the connection will be curve25519 encrypted and authenticate; otherwise it will
// be unencrypted and unauthenticated. Note that the remote end must be in the same mode (i.e.
// either accepting curve connections, or not accepting curve).
void setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_pubkey_routing_id);
void setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_ephemeral_routing_id);
/// Common connection implementation used by proxy_connect/proxy_send. Returns the socket and,
/// if a routing prefix is needed, the required prefix (or an empty string if not needed). For
@ -514,10 +514,10 @@ private:
/// @param keep_alive the keep alive for the connection, if we establish a new outgoing
/// connection. If we already have an outgoing connection then its keep-alive gets increased to
/// this if currently less than this.
/// @param pubkey_routing whether or not to use a pubkey-based routing id
/// @param ephemeral_routing_id whether or not to use a random (true) or pubkey-based (false) routing id
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(std::string_view pubkey,
std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only,
bool pubkey_routing, std::chrono::milliseconds keep_alive);
bool ephemeral_routing_id, std::chrono::milliseconds keep_alive);
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could
/// be existing or a new one). This basically just unpacks arguments and passes them on to
@ -990,7 +990,8 @@ public:
* @param options - connection options; see the structs in `connect_option`, in particular:
* - keep_alive -- how long the SN connection will be kept alive after valid activity
* - remote_hint -- a remote address hint that may be used instead of doing a lookup
* - pubkey_routing -- allows you to override using our pubkey as a routing id
* - ephemeral_routing_id -- allows you to override the EPHEMERAL_ROUTING_ID option for
* this connection.
*
* For backwards compatibility you may also directly pass (as a `options` value):
* - a std::chrono::duration duration (equivalent to connect_option::keep_alive{duration})
@ -1386,15 +1387,15 @@ struct queue_full {
namespace connect_option {
/// Specifies whether the connection should use pubkey-based routing for this connection, overriding
/// the default (OxenMQ::PUBKEY_BASED_ROUTING_ID). See OxenMQ::PUBKEY_BASED_ROUTING_ID for a
/// description of this.
/// the default (OxenMQ::EPHEMERAL_ROUTING_ID). See OxenMQ::EPHEMERAL_ROUTING_ID for a description
/// of this.
///
/// Typically use: `connect_options::pubkey_routing{}` or `connect_options::pubkey_routing{false}`.
struct pubkey_routing {
bool use_pubkey_routing_id = true;
/// Typically use: `connect_options::ephemeral_routing_id{}` or `connect_options::ephemeral_routing_id{false}`.
struct ephemeral_routing_id {
bool use_ephemeral_routing_id = true;
// Constructor; default construction gives you pubkey routing, but the bool parameter can be
// specified as false to explicitly disable the pubkey routing flag.
explicit pubkey_routing(bool use = true) : use_pubkey_routing_id{use} {}
explicit ephemeral_routing_id(bool use = true) : use_ephemeral_routing_id{use} {}
};
/// Sets the connection timeout (instead of the default REMOTE_CONNECT_TIMEOUT). Only applies to
@ -1532,8 +1533,8 @@ inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, const
if (remote) opts["auth_level"] = static_cast<std::underlying_type_t<AuthLevel>>(auth);
else omq.log(LogLevel::warn, __FILE__, __LINE__, "AuthLevel ignored for connect_sn(...)");
}
inline void apply_connect_option(OxenMQ&, bool, bt_dict& opts, const connect_option::pubkey_routing& pkr) {
opts["pubkey_routing"] = pkr.use_pubkey_routing_id;
inline void apply_connect_option(OxenMQ&, bool, bt_dict& opts, const connect_option::ephemeral_routing_id& er) {
opts["ephemeral_rid"] = er.use_ephemeral_routing_id;
}
inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, const connect_option::timeout& timeout) {
if (remote) opts["timeout"] = timeout.time.count();
@ -1582,7 +1583,7 @@ template <typename... Option>
ConnectionID OxenMQ::connect_sn(std::string_view pubkey, const Option&... options) {
bt_dict opts{
{"keep_alive", std::chrono::microseconds{DEFAULT_CONNECT_SN_KEEP_ALIVE}.count()},
{"pubkey_routing", PUBKEY_BASED_ROUTING_ID},
{"ephemeral_rid", EPHEMERAL_ROUTING_ID},
};
(detail::apply_connect_option(*this, false, opts, options), ...);

View File

@ -116,7 +116,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
retry = false;
zmq::socket_t *send_to;
if (conn_id.sn()) {
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, PUBKEY_BASED_ROUTING_ID, keep_alive);
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, EPHEMERAL_ROUTING_ID, keep_alive);
if (!sock_route.first) {
nowarn = true;
if (optional)

View File

@ -104,7 +104,7 @@ TEST_CASE("outgoing auth level", "[commands][auth]") {
client.add_command("admin", "hi", [&](auto&) { admin_hi++; });
client.start();
client.PUBKEY_BASED_ROUTING_ID = false; // establishing multiple connections below, so we need unique routing ids
client.EPHEMERAL_ROUTING_ID = true; // establishing multiple connections below, so we need unique routing ids
address server_addr{listen, server.get_pubkey()};
auto public_c = client.connect_remote(server_addr, [](auto&&...) {}, [](auto&&...) {});

View File

@ -107,7 +107,7 @@ TEST_CASE("plain-text connections", "[plaintext][connect]") {
std::atomic<bool> got{false};
bool success = false;
auto c = client.connect_remote(listen,
auto c = client.connect_remote(address{listen},
[&](auto conn) { success = true; got = true; },
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; }
);
@ -149,11 +149,11 @@ TEST_CASE("unique connection IDs", "[connect][id]") {
client2.start();
std::atomic<bool> good1{false}, good2{false};
auto r1 = client1.connect_remote(listen,
auto r1 = client1.connect_remote(address{listen},
[&](auto conn) { good1 = true; },
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
);
auto r2 = client2.connect_remote(listen,
auto r2 = client2.connect_remote(address{listen},
[&](auto conn) { good2 = true; },
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
);
@ -370,7 +370,7 @@ TEST_CASE("SN single worker test", "[connect][worker]") {
OxenMQ client{get_logger(""), LogLevel::trace};
client.start();
auto conn = client.connect_remote(listen, [](auto) {}, [](auto, auto) {});
auto conn = client.connect_remote(address{listen}, [](auto) {}, [](auto, auto) {});
std::atomic<int> got{0};
std::atomic<int> success{0};