From 34bbaaf6125aac691ac44a6f7607a8fa2bd72616 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 17 Apr 2020 16:09:53 -0300 Subject: [PATCH] Use slower and exponential backoff in reconnection ZMQ's default reconnection time is 100ms, indefinitely, which seems far too aggressive, particularly where we have some potential for hundreds or thousands of connections. This changes the default to be slightly slower (250ms instead of 100ms) on the first attempt, and to use exponential backoff doubling the time between each failed connection attempt up to a max of 5s between reconnection attempts to calm things down. --- lokimq/connections.cpp | 20 ++++++++++++++------ lokimq/lokimq.h | 18 ++++++++++++++---- lokimq/proxy.cpp | 8 +------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/lokimq/connections.cpp b/lokimq/connections.cpp index 2249875..9ab1f5f 100644 --- a/lokimq/connections.cpp +++ b/lokimq/connections.cpp @@ -35,12 +35,9 @@ void LokiMQ::rebuild_pollitems() { pollitems_stale = false; } -void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) { - if (!remote_pubkey.empty()) { - socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); - socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); - } +void LokiMQ::setup_external_socket(zmq::socket_t& socket) { + socket.setsockopt(ZMQ_RECONNECT_IVL, (int) RECONNECT_INTERVAL.count()); + socket.setsockopt(ZMQ_RECONNECT_IVL_MAX, (int) RECONNECT_INTERVAL_MAX.count()); socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); socket.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); if (CONN_HEARTBEAT > 0s) { @@ -48,6 +45,17 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pub if (CONN_HEARTBEAT_TIMEOUT > 0s) socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); } +} + +void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) { + + setup_external_socket(socket); + + if (!remote_pubkey.empty()) { + socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); + socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); + socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + } if (PUBKEY_BASED_ROUTING_ID) { std::string routing_id; diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 909d731..9c66e3a 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -196,6 +196,18 @@ public: * disconnected. -1 means no limit. */ int64_t MAX_MSG_SIZE = 1 * 1024 * 1024; + /** Minimum reconnect interval: when a connection fails or dies, wait this long before + * attempting to reconnect. (ZMQ may randomize the value somewhat to avoid reconnection + * storms). See RECONNECT_INTERVAL_MAX as well. The LokiMQ default is 250ms. + */ + std::chrono::milliseconds RECONNECT_INTERVAL = 250ms; + + /** Maximum reconnect interval. When this is set to a value larger than RECONNECT_INTERVAL then + * ZMQ's reconnection logic uses an exponential backoff: each reconnection attempts waits twice + * as long as the previous attempt, up to this maximum. The LokiMQ default is 5 seconds. + */ + std::chrono::milliseconds RECONNECT_INTERVAL_MAX = 5s; + /** How long (in ms) to linger sockets when closing them; this is the maximum time zmq spends * trying to sending pending messages before dropping them and closing the underlying socket * after the high-level zmq socket is closed. */ @@ -431,10 +443,8 @@ private: /// gets called after all works have done so. void proxy_quit(); - // Sets the various properties for a listening socket prior to binding. If curve is true then - // the socket is set up using the keys and incoming connections must already know the pubkey to - // establish a connection; otherwise the connection is plaintext without authentication. - void setup_listening_socket(zmq::socket_t& socket, bool curve); + // Common setup code for setting up an external (incoming or outgoing) socket. + void setup_external_socket(zmq::socket_t& socket); // Sets the various properties on an outgoing socket prior to connection. If remote_pubkey is // provided then the connection will be curve25519 encrypted and authenticate; otherwise it will diff --git a/lokimq/proxy.cpp b/lokimq/proxy.cpp index 4672ccc..6b22318 100644 --- a/lokimq/proxy.cpp +++ b/lokimq/proxy.cpp @@ -326,19 +326,13 @@ void LokiMQ::proxy_loop() { zmq::socket_t listener{context, zmq::socket_type::router}; std::string auth_domain = bt_serialize(i); + setup_external_socket(listener); listener.setsockopt(ZMQ_ZAP_DOMAIN, auth_domain.c_str(), auth_domain.size()); if (b.curve) { listener.setsockopt(ZMQ_CURVE_SERVER, 1); listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); } - listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); - if (CONN_HEARTBEAT > 0s) { - listener.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count()); - if (CONN_HEARTBEAT_TIMEOUT > 0s) - listener.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); - } - listener.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); listener.setsockopt(ZMQ_ROUTER_HANDOVER, 1); listener.setsockopt(ZMQ_ROUTER_MANDATORY, 1);