Use slower and exponential backoff in reconnection

ZMQ's default reconnection time is 100ms, indefinitely, which seems far
too aggressive, particularly where we have some potential for hundreds
or thousands of connections.

This changes the default to be slightly slower (250ms instead of 100ms)
on the first attempt, and to use exponential backoff doubling the time
between each failed connection attempt up to a max of 5s between
reconnection attempts to calm things down.
This commit is contained in:
Jason Rhinelander 2020-04-17 16:09:53 -03:00
parent b2518b8eb3
commit 34bbaaf612
3 changed files with 29 additions and 17 deletions

View File

@ -35,12 +35,9 @@ void LokiMQ::rebuild_pollitems() {
pollitems_stale = false;
}
void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) {
if (!remote_pubkey.empty()) {
socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size());
socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size());
socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size());
}
void LokiMQ::setup_external_socket(zmq::socket_t& socket) {
socket.setsockopt(ZMQ_RECONNECT_IVL, (int) RECONNECT_INTERVAL.count());
socket.setsockopt(ZMQ_RECONNECT_IVL_MAX, (int) RECONNECT_INTERVAL_MAX.count());
socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
socket.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
if (CONN_HEARTBEAT > 0s) {
@ -48,6 +45,17 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pub
if (CONN_HEARTBEAT_TIMEOUT > 0s)
socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count());
}
}
void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) {
setup_external_socket(socket);
if (!remote_pubkey.empty()) {
socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size());
socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size());
socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size());
}
if (PUBKEY_BASED_ROUTING_ID) {
std::string routing_id;

View File

@ -196,6 +196,18 @@ public:
* disconnected. -1 means no limit. */
int64_t MAX_MSG_SIZE = 1 * 1024 * 1024;
/** Minimum reconnect interval: when a connection fails or dies, wait this long before
* attempting to reconnect. (ZMQ may randomize the value somewhat to avoid reconnection
* storms). See RECONNECT_INTERVAL_MAX as well. The LokiMQ default is 250ms.
*/
std::chrono::milliseconds RECONNECT_INTERVAL = 250ms;
/** Maximum reconnect interval. When this is set to a value larger than RECONNECT_INTERVAL then
* ZMQ's reconnection logic uses an exponential backoff: each reconnection attempts waits twice
* as long as the previous attempt, up to this maximum. The LokiMQ default is 5 seconds.
*/
std::chrono::milliseconds RECONNECT_INTERVAL_MAX = 5s;
/** How long (in ms) to linger sockets when closing them; this is the maximum time zmq spends
* trying to sending pending messages before dropping them and closing the underlying socket
* after the high-level zmq socket is closed. */
@ -431,10 +443,8 @@ private:
/// gets called after all works have done so.
void proxy_quit();
// Sets the various properties for a listening socket prior to binding. If curve is true then
// the socket is set up using the keys and incoming connections must already know the pubkey to
// establish a connection; otherwise the connection is plaintext without authentication.
void setup_listening_socket(zmq::socket_t& socket, bool curve);
// Common setup code for setting up an external (incoming or outgoing) socket.
void setup_external_socket(zmq::socket_t& socket);
// Sets the various properties on an outgoing socket prior to connection. If remote_pubkey is
// provided then the connection will be curve25519 encrypted and authenticate; otherwise it will

View File

@ -326,19 +326,13 @@ void LokiMQ::proxy_loop() {
zmq::socket_t listener{context, zmq::socket_type::router};
std::string auth_domain = bt_serialize(i);
setup_external_socket(listener);
listener.setsockopt(ZMQ_ZAP_DOMAIN, auth_domain.c_str(), auth_domain.size());
if (b.curve) {
listener.setsockopt<int>(ZMQ_CURVE_SERVER, 1);
listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size());
listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size());
}
listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
if (CONN_HEARTBEAT > 0s) {
listener.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count());
if (CONN_HEARTBEAT_TIMEOUT > 0s)
listener.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count());
}
listener.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
listener.setsockopt<int>(ZMQ_ROUTER_HANDOVER, 1);
listener.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);