From d889f308ae81f857fd34753f579575e07e0ce0e8 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 10 Nov 2020 17:47:31 -0400 Subject: [PATCH] cppzmq 4.7+ compatibility Updates bundled cppzmq to 4.7.1, and replaces deprecated functions with new API. --- cppzmq | 2 +- lokimq/connections.cpp | 22 +++++++++++----------- lokimq/lokimq.cpp | 13 +++++++------ lokimq/lokimq.h | 2 +- lokimq/proxy.cpp | 23 +++++++++++------------ lokimq/worker.cpp | 6 +++--- 6 files changed, 34 insertions(+), 34 deletions(-) diff --git a/cppzmq b/cppzmq index 8d5c9a8..76bf169 160000 --- a/cppzmq +++ b/cppzmq @@ -1 +1 @@ -Subproject commit 8d5c9a88988dcbebb72939ca0939d432230ffde1 +Subproject commit 76bf169fd67b8e99c1b0e6490029d9cd5ef97666 diff --git a/lokimq/connections.cpp b/lokimq/connections.cpp index 3cc9f8e..92e106c 100644 --- a/lokimq/connections.cpp +++ b/lokimq/connections.cpp @@ -36,14 +36,14 @@ void LokiMQ::rebuild_pollitems() { } void LokiMQ::setup_external_socket(zmq::socket_t& socket) { - socket.setsockopt(ZMQ_RECONNECT_IVL, (int) RECONNECT_INTERVAL.count()); - socket.setsockopt(ZMQ_RECONNECT_IVL_MAX, (int) RECONNECT_INTERVAL_MAX.count()); - socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); - socket.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); + socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count()); + socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count()); + socket.set(zmq::sockopt::handshake_ivl, (int) HANDSHAKE_TIME.count()); + socket.set(zmq::sockopt::maxmsgsize, MAX_MSG_SIZE); if (CONN_HEARTBEAT > 0s) { - socket.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count()); + socket.set(zmq::sockopt::heartbeat_ivl, (int) CONN_HEARTBEAT.count()); if (CONN_HEARTBEAT_TIMEOUT > 0s) - socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count()); + socket.set(zmq::sockopt::heartbeat_timeout, (int) CONN_HEARTBEAT_TIMEOUT.count()); } } @@ -52,9 +52,9 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot setup_external_socket(socket); if (!remote_pubkey.empty()) { - socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); - socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + socket.set(zmq::sockopt::curve_serverkey, remote_pubkey); + socket.set(zmq::sockopt::curve_publickey, pubkey); + socket.set(zmq::sockopt::curve_secretkey, privkey); } if (PUBKEY_BASED_ROUTING_ID) { @@ -62,7 +62,7 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot routing_id.reserve(33); routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0) routing_id.append(pubkey.begin(), pubkey.end()); - socket.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size()); + socket.set(zmq::sockopt::routing_id, routing_id); } // else let ZMQ pick a random one } @@ -227,7 +227,7 @@ void update_connection_indices(Container& c, size_t index, AccessIndex get_index /// which can invalidate iterators on the various connection containers - if you don't want that, /// delete it first so that the container won't contain the element being deleted. void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) { - connections[index].setsockopt(ZMQ_LINGER, linger > 0ms ? linger.count() : 0); + connections[index].set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0); pollitems_stale = true; connections.erase(connections.begin() + index); diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index e5ceb03..7c8f1d5 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -1,5 +1,6 @@ #include "lokimq.h" #include "lokimq-internal.h" +#include "zmq.hpp" #include #include #include @@ -74,8 +75,8 @@ std::pair extract_metadata(zmq::message_t& msg) { } // namespace detail -int LokiMQ::set_zmq_context_option(int option, int value) { - return context.setctxopt(option, value); +void LokiMQ::set_zmq_context_option(zmq::ctxopt option, int value) { + context.set(option, value); } void LokiMQ::log_level(LogLevel level) { @@ -181,7 +182,7 @@ zmq::socket_t& LokiMQ::get_control_socket() { if (proxy_shutting_down) throw std::runtime_error("Unable to obtain LokiMQ control socket: proxy thread is shutting down"); auto control = std::make_shared(context, zmq::socket_type::dealer); - control->setsockopt(ZMQ_LINGER, 0); + control->set(zmq::sockopt::linger, 0); control->connect(SN_ADDR_COMMAND); thread_control_sockets.push_back(control); control_sockets.emplace(object_id, control); @@ -243,9 +244,9 @@ void LokiMQ::start() { LMQ_LOG(info, "Initializing LokiMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey)); - int zmq_socket_limit = context.getctxopt(ZMQ_SOCKET_LIMIT); + int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit); if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit) - context.setctxopt(ZMQ_MAX_SOCKETS, MAX_SOCKETS); + context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS); else LMQ_LOG(error, "Not applying LokiMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]"); @@ -400,7 +401,7 @@ LokiMQ::~LokiMQ() { // proxy thread starts (and we're getting destructed here without a proxy thread). So // we need to start listening on it here in the destructor so that we establish a // connection and send the QUITs to the tagged worker threads. - workers_socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1); + workers_socket.set(zmq::sockopt::router_mandatory, true); workers_socket.bind(SN_ADDR_WORKERS); for (auto& [run, busy, queue] : tagged_workers) { while (true) { diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 0bac888..29d4a53 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -257,7 +257,7 @@ public: std::chrono::milliseconds CONN_HEARTBEAT_TIMEOUT = 30s; /// Allows you to set options on the internal zmq context object. For advanced use only. - int set_zmq_context_option(int option, int value); + void set_zmq_context_option(zmq::ctxopt option, int value); /** The umask to apply when constructing sockets (which affects any new ipc:// listening sockets * that get created). Does nothing if set to -1 (the default), and does nothing on Windows. diff --git a/lokimq/proxy.cpp b/lokimq/proxy.cpp index ec5cb73..3b78fcd 100644 --- a/lokimq/proxy.cpp +++ b/lokimq/proxy.cpp @@ -25,7 +25,7 @@ void LokiMQ::proxy_quit() { assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); })); assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); })); - command.setsockopt(ZMQ_LINGER, 0); + command.set(zmq::sockopt::linger, 0); command.close(); { std::lock_guard lock{control_sockets_mutex}; @@ -36,7 +36,7 @@ void LokiMQ::proxy_quit() { workers_socket.close(); int linger = std::chrono::milliseconds{CLOSE_LINGER}.count(); for (auto& s : connections) - s.setsockopt(ZMQ_LINGER, linger); + s.set(zmq::sockopt::linger, linger); connections.clear(); peers.clear(); @@ -322,10 +322,10 @@ void LokiMQ::proxy_loop() { pthread_setname_np("lmq-proxy"); #endif - zap_auth.setsockopt(ZMQ_LINGER, 0); + zap_auth.set(zmq::sockopt::linger, 0); zap_auth.bind(ZMQ_ADDR_ZAP); - workers_socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1); + workers_socket.set(zmq::sockopt::router_mandatory, true); workers_socket.bind(SN_ADDR_WORKERS); assert(general_workers > 0); @@ -362,16 +362,15 @@ void LokiMQ::proxy_loop() { auto& b = bind[i].second; zmq::socket_t listener{context, zmq::socket_type::router}; - std::string auth_domain = bt_serialize(i); setup_external_socket(listener); - listener.setsockopt(ZMQ_ZAP_DOMAIN, auth_domain.c_str(), auth_domain.size()); + listener.set(zmq::sockopt::zap_domain, bt_serialize(i)); if (b.curve) { - listener.setsockopt(ZMQ_CURVE_SERVER, 1); - listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + listener.set(zmq::sockopt::curve_server, true); + listener.set(zmq::sockopt::curve_publickey, pubkey); + listener.set(zmq::sockopt::curve_secretkey, privkey); } - listener.setsockopt(ZMQ_ROUTER_HANDOVER, 1); - listener.setsockopt(ZMQ_ROUTER_MANDATORY, 1); + listener.set(zmq::sockopt::router_handover, true); + listener.set(zmq::sockopt::router_mandatory, true); listener.bind(bind[i].first); LMQ_LOG(info, "LokiMQ listening on ", bind[i].first); @@ -552,7 +551,7 @@ static bool is_error_response(std::string_view cmd) { // reason) bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector& parts) { // Doubling as a bool and an offset: - size_t incoming = connections[conn_index].getsockopt(ZMQ_TYPE) == ZMQ_ROUTER; + size_t incoming = connections[conn_index].get(zmq::sockopt::type) == ZMQ_ROUTER; std::string_view route, cmd; if (parts.size() < 1 + incoming) { diff --git a/lokimq/worker.cpp b/lokimq/worker.cpp index 7f09bce..548fb4d 100644 --- a/lokimq/worker.cpp +++ b/lokimq/worker.cpp @@ -35,7 +35,7 @@ bool worker_wait_for(LokiMQ& lmq, zmq::socket_t& sock, std::vector(ZMQ_LINGER, 1000); + sock.set(zmq::sockopt::linger, 1000); sock.close(); return false; } else { @@ -61,7 +61,7 @@ void LokiMQ::worker_thread(unsigned int index, std::optional tagged #endif zmq::socket_t sock{context, zmq::socket_type::dealer}; - sock.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size()); + sock.set(zmq::sockopt::routing_id, routing_id); LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started"); sock.connect(SN_ADDR_WORKERS); if (tagged) @@ -276,7 +276,7 @@ void LokiMQ::proxy_run_worker(run_info& run) { } void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& parts) { - bool outgoing = connections[conn_index].getsockopt(ZMQ_TYPE) == ZMQ_DEALER; + bool outgoing = connections[conn_index].get(zmq::sockopt::type) == ZMQ_DEALER; peer_info tmp_peer; tmp_peer.conn_index = conn_index;