diff --git a/lokimq/connections.cpp b/lokimq/connections.cpp index 2249875..9ab1f5f 100644 --- a/lokimq/connections.cpp +++ b/lokimq/connections.cpp @@ -35,12 +35,9 @@ void LokiMQ::rebuild_pollitems() { pollitems_stale = false; } -void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) { - if (!remote_pubkey.empty()) { - socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); - socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); - } +void LokiMQ::setup_external_socket(zmq::socket_t& socket) { + socket.setsockopt(ZMQ_RECONNECT_IVL, (int) RECONNECT_INTERVAL.count()); + socket.setsockopt(ZMQ_RECONNECT_IVL_MAX, (int) RECONNECT_INTERVAL_MAX.count()); socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); socket.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); if (CONN_HEARTBEAT > 0s) { @@ -48,6 +45,17 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pub if (CONN_HEARTBEAT_TIMEOUT > 0s) socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); } +} + +void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) { + + setup_external_socket(socket); + + if (!remote_pubkey.empty()) { + socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); + socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); + socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + } if (PUBKEY_BASED_ROUTING_ID) { std::string routing_id; diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 909d731..9c66e3a 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -196,6 +196,18 @@ public: * disconnected. -1 means no limit. */ int64_t MAX_MSG_SIZE = 1 * 1024 * 1024; + /** Minimum reconnect interval: when a connection fails or dies, wait this long before + * attempting to reconnect. (ZMQ may randomize the value somewhat to avoid reconnection + * storms). See RECONNECT_INTERVAL_MAX as well. The LokiMQ default is 250ms. + */ + std::chrono::milliseconds RECONNECT_INTERVAL = 250ms; + + /** Maximum reconnect interval. When this is set to a value larger than RECONNECT_INTERVAL then + * ZMQ's reconnection logic uses an exponential backoff: each reconnection attempts waits twice + * as long as the previous attempt, up to this maximum. The LokiMQ default is 5 seconds. + */ + std::chrono::milliseconds RECONNECT_INTERVAL_MAX = 5s; + /** How long (in ms) to linger sockets when closing them; this is the maximum time zmq spends * trying to sending pending messages before dropping them and closing the underlying socket * after the high-level zmq socket is closed. */ @@ -431,10 +443,8 @@ private: /// gets called after all works have done so. void proxy_quit(); - // Sets the various properties for a listening socket prior to binding. If curve is true then - // the socket is set up using the keys and incoming connections must already know the pubkey to - // establish a connection; otherwise the connection is plaintext without authentication. - void setup_listening_socket(zmq::socket_t& socket, bool curve); + // Common setup code for setting up an external (incoming or outgoing) socket. + void setup_external_socket(zmq::socket_t& socket); // Sets the various properties on an outgoing socket prior to connection. If remote_pubkey is // provided then the connection will be curve25519 encrypted and authenticate; otherwise it will diff --git a/lokimq/proxy.cpp b/lokimq/proxy.cpp index 4672ccc..6b22318 100644 --- a/lokimq/proxy.cpp +++ b/lokimq/proxy.cpp @@ -326,19 +326,13 @@ void LokiMQ::proxy_loop() { zmq::socket_t listener{context, zmq::socket_type::router}; std::string auth_domain = bt_serialize(i); + setup_external_socket(listener); listener.setsockopt(ZMQ_ZAP_DOMAIN, auth_domain.c_str(), auth_domain.size()); if (b.curve) { listener.setsockopt(ZMQ_CURVE_SERVER, 1); listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); } - listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); - if (CONN_HEARTBEAT > 0s) { - listener.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count()); - if (CONN_HEARTBEAT_TIMEOUT > 0s) - listener.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); - } - listener.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); listener.setsockopt(ZMQ_ROUTER_HANDOVER, 1); listener.setsockopt(ZMQ_ROUTER_MANDATORY, 1);