diff --git a/CMakeLists.txt b/CMakeLists.txt index 51c4076..a72dc93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.14 + VERSION 1.2.16 LANGUAGES CXX C) include(GNUInstallDirs) @@ -39,9 +39,15 @@ set(oxenmq_INSTALL_DEFAULT OFF) if(BUILD_SHARED_LIBS OR oxenmq_IS_TOPLEVEL_PROJECT) set(oxenmq_INSTALL_DEFAULT ON) endif() +set(oxenmq_EPOLL_DEFAULT OFF) +if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT CMAKE_CROSSCOMPILING) + set(oxenmq_EPOLL_DEFAULT ON) +endif() + option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT}) option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT}) option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON) +option(OXENMQ_USE_EPOLL "Use epoll for socket polling (requires Linux)" ${oxenmq_EPOLL_DEFAULT}) list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") @@ -59,6 +65,9 @@ add_library(oxenmq oxenmq/worker.cpp ) set_target_properties(oxenmq PROPERTIES SOVERSION ${OXENMQ_LIBVERSION}) +if(OXENMQ_USE_EPOLL) + target_compile_definitions(oxenmq PRIVATE OXENMQ_USE_EPOLL) +endif() set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) diff --git a/oxen-encoding b/oxen-encoding index 707a836..462be41 160000 --- a/oxen-encoding +++ b/oxen-encoding @@ -1 +1 @@ -Subproject commit 707a83609fb64d09b61ed1e56c82bf692050d2a1 +Subproject commit 462be41bd481b331dabeb3c220b349ef35c89e56 diff --git a/oxenmq/connections.cpp b/oxenmq/connections.cpp index c5ecd70..5eefc0f 100644 --- a/oxenmq/connections.cpp +++ b/oxenmq/connections.cpp @@ -3,12 +3,47 @@ #include #include +#ifdef OXENMQ_USE_EPOLL +extern "C" { +#include +#include +} +#endif + namespace oxenmq { std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) { return o << conn.to_string(); } +#ifdef OXENMQ_USE_EPOLL + +void OxenMQ::rebuild_pollitems() { + + if (epoll_fd != -1) + close(epoll_fd); + epoll_fd = epoll_create1(0); + + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.u64 = EPOLL_COMMAND_ID; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, command.get(zmq::sockopt::fd), &ev); + + ev.data.u64 = EPOLL_WORKER_ID; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, workers_socket.get(zmq::sockopt::fd), &ev); + + ev.data.u64 = EPOLL_ZAP_ID; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, zap_auth.get(zmq::sockopt::fd), &ev); + + for (auto& [id, s] : connections) { + ev.data.u64 = id; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s.get(zmq::sockopt::fd), &ev); + } + connections_updated = false; +} + +#else // !OXENMQ_USE_EPOLL + namespace { void add_pollitem(std::vector& pollitems, zmq::socket_t& sock) { @@ -33,6 +68,8 @@ void OxenMQ::rebuild_pollitems() { connections_updated = false; } +#endif // OXENMQ_USE_EPOLL + void OxenMQ::setup_external_socket(zmq::socket_t& socket) { socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count()); socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count()); diff --git a/oxenmq/oxenmq-internal.h b/oxenmq/oxenmq-internal.h index 3e7dde4..dc5ef25 100644 --- a/oxenmq/oxenmq-internal.h +++ b/oxenmq/oxenmq-internal.h @@ -1,4 +1,5 @@ #pragma once +#include #include "oxenmq.h" // Inside some method: @@ -20,6 +21,14 @@ constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers"; constexpr char SN_ADDR_SELF[] = "inproc://sn-self"; constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01"; +#ifdef OXENMQ_USE_EPOLL + +constexpr auto EPOLL_COMMAND_ID = std::numeric_limits::max(); +constexpr auto EPOLL_WORKER_ID = std::numeric_limits::max() - 1; +constexpr auto EPOLL_ZAP_ID = std::numeric_limits::max() - 2; + +#endif + /// Destructor for create_message(std::string&&) that zmq calls when it's done with the message. extern "C" inline void message_buffer_destroy(void*, void* hint) { delete reinterpret_cast(hint); diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index 06bfdd2..7dd1d50 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -460,11 +460,12 @@ std::ostream &operator<<(std::ostream &os, LogLevel lvl) { std::string make_random_string(size_t size) { static thread_local std::mt19937_64 rng{std::random_device{}()}; - static thread_local std::uniform_int_distribution dist{std::numeric_limits::min(), std::numeric_limits::max()}; std::string rando; rando.reserve(size); - for (size_t i = 0; i < size; i++) - rando += dist(rng); + while (rando.size() < size) { + uint64_t x = rng(); + rando.append(reinterpret_cast(&x), std::min(size - rando.size(), 8)); + } return rando; } diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index fe53627..04dd46a 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -420,8 +420,14 @@ private: /// sockets for inter-thread communication followed by a pollitem for every connection (both /// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever /// `connections_updated` is set to true. + /// + /// On Linux, when using epoll, this is not used. std::vector pollitems; + /// On Linux, when using epoll, this tracks the epoll file descriptor. Otherwise it does + /// nothing. + int epoll_fd = -1; + /// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets. void rebuild_pollitems(); diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index aa0c166..a04460f 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -11,6 +11,10 @@ extern "C" { } #endif +#ifdef OXENMQ_USE_EPOLL +#include +#endif + #ifndef _WIN32 extern "C" { #include @@ -496,6 +500,12 @@ void OxenMQ::proxy_loop(std::promise startup) { // General vector for handling incoming messages: std::vector parts; + std::vector*> queue; // Used as a circular buffer + +#ifdef OXENMQ_USE_EPOLL + std::vector evs; +#endif + while (true) { std::chrono::milliseconds poll_timeout; if (max_workers == 0) { // Will be 0 only if we are quitting @@ -509,9 +519,52 @@ void OxenMQ::proxy_loop(std::promise startup) { poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())}; } - if (connections_updated) + if (connections_updated) { rebuild_pollitems(); + // If we just rebuilt the queue then do a full check of everything, because we might + // have sockets that already edge-triggered that we need to fully drain before we start + // polling. + proxy_skip_one_poll = true; + } + // We round-robin connections when pulling off pending messages one-by-one rather than + // pulling off all messages from one connection before moving to the next; thus in cases of + // contention we end up fairly distributing. + queue.reserve(connections.size() + 1); + +#ifdef OXENMQ_USE_EPOLL + bool process_command = false, process_worker = false, process_zap = false, process_all = false; + + if (proxy_skip_one_poll) { + proxy_skip_one_poll = false; + + process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN; + process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN; + process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN; + process_all = true; + } + else { + OMQ_TRACE("polling for new messages via epoll"); + + evs.resize(3 + connections.size()); + const int max = epoll_wait(epoll_fd, evs.data(), evs.size(), poll_timeout.count()); + + queue.clear(); + for (int i = 0; i < max; i++) { + const auto conn_id = evs[i].data.u64; + if (conn_id == EPOLL_COMMAND_ID) + process_command = true; + else if (conn_id == EPOLL_WORKER_ID) + process_worker = true; + else if (conn_id == EPOLL_ZAP_ID) + process_zap = true; + else if (auto it = connections.find(conn_id); it != connections.end()) + queue.push_back(&*it); + } + queue.push_back(nullptr); + } + +#else if (proxy_skip_one_poll) proxy_skip_one_poll = false; else { @@ -524,23 +577,29 @@ void OxenMQ::proxy_loop(std::promise startup) { zmq::poll(pollitems.data(), pollitems.size(), poll_timeout); } - OMQ_TRACE("processing control messages"); - // Retrieve any waiting incoming control messages - while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) { - proxy_control_message(control_parts, len); + constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true; +#endif + + if (process_command) { + OMQ_TRACE("processing control messages"); + while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) + proxy_control_message(control_parts, len); } - OMQ_TRACE("processing worker messages"); - while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) { - proxy_worker_message(control_parts, len); + if (process_worker) { + OMQ_TRACE("processing worker messages"); + while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) + proxy_worker_message(control_parts, len); } OMQ_TRACE("processing timers"); zmq_timers_execute(timers.get()); - // Handle any zap authentication - OMQ_TRACE("processing zap requests"); - process_zap_requests(); + if (process_zap) { + // Handle any zap authentication + OMQ_TRACE("processing zap requests"); + process_zap_requests(); + } // See if we can drain anything from the current queue before we potentially add to it // below. @@ -548,15 +607,14 @@ void OxenMQ::proxy_loop(std::promise startup) { proxy_process_queue(); OMQ_TRACE("processing new incoming messages"); + if (process_all) { + queue.clear(); + for (auto& id_sock : connections) + if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN) + queue.push_back(&id_sock); + queue.push_back(nullptr); + } - // We round-robin connections when pulling off pending messages one-by-one rather than - // pulling off all messages from one connection before moving to the next; thus in cases of - // contention we end up fairly distributing. - std::vector*> queue; // Used as a circular buffer - queue.reserve(connections.size() + 1); - for (auto& id_sock : connections) - queue.push_back(&id_sock); - queue.push_back(nullptr); size_t end = queue.size() - 1; for (size_t pos = 0; pos != end; ++pos %= queue.size()) { @@ -580,13 +638,40 @@ void OxenMQ::proxy_loop(std::promise startup) { proxy_to_worker(id, sock, parts); if (connections_updated) { - // If connections got updated then our points are stale, to restart the proxy loop; - // if there are still messages waiting we'll end up right back here. + // If connections got updated then our points are stale, so restart the proxy loop; + // we'll immediately end up right back here at least once before we resume polling. OMQ_TRACE("connections became stale; short-circuiting incoming message loop"); break; } } +#ifdef OXENMQ_USE_EPOLL + // If any socket still has ZMQ_POLLIN (which is possible if something we did above changed + // state on another socket, perhaps by writing to it) then we need to repeat the loop + // *without* going back to epoll again, until we get through everything without any + // ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because + // of ZMQ's edge-triggered notifications on zmq fd's. + // + // More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and + // https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/ + if (!connections_updated && !proxy_skip_one_poll) { + for (auto* s : {&command, &workers_socket, &zap_auth}) { + if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) { + proxy_skip_one_poll = true; + break; + } + } + if (!proxy_skip_one_poll) { + for (auto& [id, sock] : connections) { + if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) { + proxy_skip_one_poll = true; + break; + } + } + } + } +#endif + OMQ_TRACE("done proxy loop"); } }