Add ZMTP heartbeating (enabled by default)

ZMTP heartbeating should help keep the connection alive, and should
result in earlier detection of connection failures.
This commit is contained in:
Jason Rhinelander 2020-04-14 16:06:01 -03:00
parent b081cf9331
commit 7de36da483
3 changed files with 28 additions and 0 deletions

View File

@ -43,6 +43,12 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pub
}
socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
socket.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
if (CONN_HEARTBEAT > 0s) {
socket.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count());
if (CONN_HEARTBEAT_TIMEOUT > 0s)
socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count());
}
if (PUBKEY_BASED_ROUTING_ID) {
std::string routing_id;
routing_id.reserve(33);

View File

@ -208,6 +208,23 @@ public:
*/
std::chrono::milliseconds CONN_CHECK_INTERVAL = 250ms;
/** Whether to enable heartbeats on incoming/outgoing connections. If set to > 0 then we set up
* ZMQ to send a heartbeat ping over the socket this often, which helps keep the connection
* alive and lets failed connections be detected sooner (see the next option).
*
* Only new connections created after changing this are affected, so if changing it is
* recommended to set it before calling `start()`.
*/
std::chrono::milliseconds CONN_HEARTBEAT = 3s;
/** When CONN_HEARTBEAT is enabled, this sets how long we wait for a reply on a socket before
* considering the socket to have died and closing it.
*
* Only new connections created after changing this are affected, so if changing it is
* recommended to set it before calling `start()`.
*/
std::chrono::milliseconds CONN_HEARTBEAT_TIMEOUT = 30s;
/// Allows you to set options on the internal zmq context object. For advanced use only.
int set_zmq_context_option(int option, int value);

View File

@ -333,6 +333,11 @@ void LokiMQ::proxy_loop() {
listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size());
}
listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
if (CONN_HEARTBEAT > 0s) {
listener.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count());
if (CONN_HEARTBEAT_TIMEOUT > 0s)
listener.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count());
}
listener.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
listener.setsockopt<int>(ZMQ_ROUTER_HANDOVER, 1);
listener.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);