From 7de36da483d1c4f3babd9caf36312edfe784ab57 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 14 Apr 2020 16:06:01 -0300 Subject: [PATCH] Add ZMTP heartbeating (enabled by default) ZMTP heartbeating should help keep the connection alive, and should result in earlier detection of connection failures. --- lokimq/connections.cpp | 6 ++++++ lokimq/lokimq.h | 17 +++++++++++++++++ lokimq/proxy.cpp | 5 +++++ 3 files changed, 28 insertions(+) diff --git a/lokimq/connections.cpp b/lokimq/connections.cpp index 2a7f112..ae29685 100644 --- a/lokimq/connections.cpp +++ b/lokimq/connections.cpp @@ -43,6 +43,12 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pub } socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); socket.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); + if (CONN_HEARTBEAT > 0s) { + socket.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count()); + if (CONN_HEARTBEAT_TIMEOUT > 0s) + socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); + } + if (PUBKEY_BASED_ROUTING_ID) { std::string routing_id; routing_id.reserve(33); diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 17a0ad4..909d731 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -208,6 +208,23 @@ public: */ std::chrono::milliseconds CONN_CHECK_INTERVAL = 250ms; + /** Whether to enable heartbeats on incoming/outgoing connections. If set to > 0 then we set up + * ZMQ to send a heartbeat ping over the socket this often, which helps keep the connection + * alive and lets failed connections be detected sooner (see the next option). + * + * Only new connections created after changing this are affected, so if changing it is + * recommended to set it before calling `start()`. + */ + std::chrono::milliseconds CONN_HEARTBEAT = 3s; + + /** When CONN_HEARTBEAT is enabled, this sets how long we wait for a reply on a socket before + * considering the socket to have died and closing it. + * + * Only new connections created after changing this are affected, so if changing it is + * recommended to set it before calling `start()`. + */ + std::chrono::milliseconds CONN_HEARTBEAT_TIMEOUT = 30s; + /// Allows you to set options on the internal zmq context object. For advanced use only. int set_zmq_context_option(int option, int value); diff --git a/lokimq/proxy.cpp b/lokimq/proxy.cpp index 61671f4..bbd7517 100644 --- a/lokimq/proxy.cpp +++ b/lokimq/proxy.cpp @@ -333,6 +333,11 @@ void LokiMQ::proxy_loop() { listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); } listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); + if (CONN_HEARTBEAT > 0s) { + listener.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count()); + if (CONN_HEARTBEAT_TIMEOUT > 0s) + listener.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); + } listener.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); listener.setsockopt(ZMQ_ROUTER_HANDOVER, 1); listener.setsockopt(ZMQ_ROUTER_MANDATORY, 1);