From 8f97add30fa241047f4445529fdc3d928d3c12ed Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 14 Sep 2023 14:38:39 -0300 Subject: [PATCH] Add epoll support for Linux Each call to zmq::poll is painfully slow when we have many open zmq sockets, such as when we have 1800 outbound connections (i.e. connected to every other service node, as services nodes might have sometimes and the Session push notification server *always* has). In testing on my local Ryzen 5950 system each time we go back to zmq::poll incurs about 1.5ms of (mostly system) CPU time with 2000 open outbound sockets, and so if we're being pelted with a nearly constant stream of requests (such as happens with the Session push notification server) we incur massive CPU costs every time we finish processing messages and go back to wait (via zmq::poll) for more. In testing a simple ZMQ (no OxenMQ) client/server that establishes 2000 connections to a server, and then has the server send a message back on a random connection every 1ms, we get atrocious CPU usage: the proxy thread spends a constant 100% CPU time. Virtually all of this is in the poll call itself, though, so we aren't really bottlenecked by how much can go through the proxy thread: in such a scenario the poll call uses its CPU then returns right away, we process the queue of messages, and return to another poll call. If we have lots of messages received in that time, though (because messages are coming fast and the poll was slow) then we process a lot all at once before going back to the poll, so the main consequences here are that: 1) We use a huge amount of CPU 2) We introduce latency in a busy situation because the CPU has to make the poll call (e.g. 1.5ms) before the next message can be processed. 3) If traffic is very bursty then the latency can manifest another problem: in the time it takes to poll we could accumulate enough incoming messages to overfill our internal per-category job queue, which was happening in the SPNS. (I also tested with 20k connections, and the poll time scaling was linear: we still processed everything, but in larger chunks because every poll call took about 15ms, and so we'd have about 15 messages at a time to process with added latency of up to 15ms). Switching to epoll *drastically* reduces the CPU usage in two ways: 1) It's massively faster by design: there's a single setup and communication of all the polling details to the kernel which we only have to do when our set of zmq sockets changes (which is relatively rare). 2) We can further reduce CPU time because epoll tells us *which* sockets need attention, and so if only 1 connection out of the 2000 sent us something we can only bother checking that single socket for messages. (In theory we can do the same with zmq::poll by querying for events available on the socket, but in practice it doesn't improve anything over just trying to read from them all). In my straight zmq test script, using epoll instead reduced CPU usage in the sends-every-1ms scenario from a constant pegged 100% of a core to an average of 2-3% of a single core. (Moreover this CPU usage level didn't noticeably change when using 20k connections instead of 2k). --- CMakeLists.txt | 11 ++++- oxenmq/connections.cpp | 37 +++++++++++++++ oxenmq/oxenmq-internal.h | 9 ++++ oxenmq/oxenmq.h | 6 +++ oxenmq/proxy.cpp | 99 +++++++++++++++++++++++++++++++--------- 5 files changed, 140 insertions(+), 22 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 47b8a68..a72dc93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.15 + VERSION 1.2.16 LANGUAGES CXX C) include(GNUInstallDirs) @@ -39,9 +39,15 @@ set(oxenmq_INSTALL_DEFAULT OFF) if(BUILD_SHARED_LIBS OR oxenmq_IS_TOPLEVEL_PROJECT) set(oxenmq_INSTALL_DEFAULT ON) endif() +set(oxenmq_EPOLL_DEFAULT OFF) +if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT CMAKE_CROSSCOMPILING) + set(oxenmq_EPOLL_DEFAULT ON) +endif() + option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT}) option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT}) option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON) +option(OXENMQ_USE_EPOLL "Use epoll for socket polling (requires Linux)" ${oxenmq_EPOLL_DEFAULT}) list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") @@ -59,6 +65,9 @@ add_library(oxenmq oxenmq/worker.cpp ) set_target_properties(oxenmq PROPERTIES SOVERSION ${OXENMQ_LIBVERSION}) +if(OXENMQ_USE_EPOLL) + target_compile_definitions(oxenmq PRIVATE OXENMQ_USE_EPOLL) +endif() set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) diff --git a/oxenmq/connections.cpp b/oxenmq/connections.cpp index c5ecd70..5eefc0f 100644 --- a/oxenmq/connections.cpp +++ b/oxenmq/connections.cpp @@ -3,12 +3,47 @@ #include #include +#ifdef OXENMQ_USE_EPOLL +extern "C" { +#include +#include +} +#endif + namespace oxenmq { std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) { return o << conn.to_string(); } +#ifdef OXENMQ_USE_EPOLL + +void OxenMQ::rebuild_pollitems() { + + if (epoll_fd != -1) + close(epoll_fd); + epoll_fd = epoll_create1(0); + + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.u64 = EPOLL_COMMAND_ID; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, command.get(zmq::sockopt::fd), &ev); + + ev.data.u64 = EPOLL_WORKER_ID; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, workers_socket.get(zmq::sockopt::fd), &ev); + + ev.data.u64 = EPOLL_ZAP_ID; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, zap_auth.get(zmq::sockopt::fd), &ev); + + for (auto& [id, s] : connections) { + ev.data.u64 = id; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s.get(zmq::sockopt::fd), &ev); + } + connections_updated = false; +} + +#else // !OXENMQ_USE_EPOLL + namespace { void add_pollitem(std::vector& pollitems, zmq::socket_t& sock) { @@ -33,6 +68,8 @@ void OxenMQ::rebuild_pollitems() { connections_updated = false; } +#endif // OXENMQ_USE_EPOLL + void OxenMQ::setup_external_socket(zmq::socket_t& socket) { socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count()); socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count()); diff --git a/oxenmq/oxenmq-internal.h b/oxenmq/oxenmq-internal.h index 3e7dde4..dc5ef25 100644 --- a/oxenmq/oxenmq-internal.h +++ b/oxenmq/oxenmq-internal.h @@ -1,4 +1,5 @@ #pragma once +#include #include "oxenmq.h" // Inside some method: @@ -20,6 +21,14 @@ constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers"; constexpr char SN_ADDR_SELF[] = "inproc://sn-self"; constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01"; +#ifdef OXENMQ_USE_EPOLL + +constexpr auto EPOLL_COMMAND_ID = std::numeric_limits::max(); +constexpr auto EPOLL_WORKER_ID = std::numeric_limits::max() - 1; +constexpr auto EPOLL_ZAP_ID = std::numeric_limits::max() - 2; + +#endif + /// Destructor for create_message(std::string&&) that zmq calls when it's done with the message. extern "C" inline void message_buffer_destroy(void*, void* hint) { delete reinterpret_cast(hint); diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index fe53627..04dd46a 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -420,8 +420,14 @@ private: /// sockets for inter-thread communication followed by a pollitem for every connection (both /// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever /// `connections_updated` is set to true. + /// + /// On Linux, when using epoll, this is not used. std::vector pollitems; + /// On Linux, when using epoll, this tracks the epoll file descriptor. Otherwise it does + /// nothing. + int epoll_fd = -1; + /// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets. void rebuild_pollitems(); diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index aa0c166..8d4df9d 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -11,6 +11,10 @@ extern "C" { } #endif +#ifdef OXENMQ_USE_EPOLL +#include +#endif + #ifndef _WIN32 extern "C" { #include @@ -496,6 +500,12 @@ void OxenMQ::proxy_loop(std::promise startup) { // General vector for handling incoming messages: std::vector parts; + std::vector*> queue; // Used as a circular buffer + +#ifdef OXENMQ_USE_EPOLL + std::vector evs; +#endif + while (true) { std::chrono::milliseconds poll_timeout; if (max_workers == 0) { // Will be 0 only if we are quitting @@ -509,9 +519,48 @@ void OxenMQ::proxy_loop(std::promise startup) { poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())}; } - if (connections_updated) + if (connections_updated) { rebuild_pollitems(); + // If we just rebuilt the queue then do a full check of everything, because we might + // have sockets that already edge-triggered that we need to fully drain before we start + // polling. + proxy_skip_one_poll = true; + } + // We round-robin connections when pulling off pending messages one-by-one rather than + // pulling off all messages from one connection before moving to the next; thus in cases of + // contention we end up fairly distributing. + queue.reserve(connections.size() + 1); + +#ifdef OXENMQ_USE_EPOLL + bool process_control = false, process_worker = false, process_zap = false, process_all = false; + + if (proxy_skip_one_poll) { + proxy_skip_one_poll = false; + process_all = true; + } + else { + OMQ_TRACE("polling for new messages via epoll"); + + evs.resize(3 + connections.size()); + const int max = epoll_wait(epoll_fd, evs.data(), evs.size(), poll_timeout.count()); + + queue.clear(); + for (int i = 0; i < max; i++) { + const auto conn_id = evs[i].data.u64; + if (conn_id == EPOLL_COMMAND_ID) + process_control = true; + else if (conn_id == EPOLL_WORKER_ID) + process_worker = true; + else if (conn_id == EPOLL_ZAP_ID) + process_zap = true; + else if (auto it = connections.find(conn_id); it != connections.end()) + queue.push_back(&*it); + } + queue.push_back(nullptr); + } + +#else if (proxy_skip_one_poll) proxy_skip_one_poll = false; else { @@ -524,23 +573,32 @@ void OxenMQ::proxy_loop(std::promise startup) { zmq::poll(pollitems.data(), pollitems.size(), poll_timeout); } - OMQ_TRACE("processing control messages"); - // Retrieve any waiting incoming control messages - while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) { - proxy_control_message(control_parts, len); + constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true; +#endif + + if (process_control || process_all) { + OMQ_TRACE("processing control messages"); + // Retrieve any waiting incoming control messages + while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) { + proxy_control_message(control_parts, len); + } } - OMQ_TRACE("processing worker messages"); - while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) { - proxy_worker_message(control_parts, len); + if (process_worker || process_all) { + OMQ_TRACE("processing worker messages"); + while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) { + proxy_worker_message(control_parts, len); + } } OMQ_TRACE("processing timers"); zmq_timers_execute(timers.get()); - // Handle any zap authentication - OMQ_TRACE("processing zap requests"); - process_zap_requests(); + if (process_zap || process_all) { + // Handle any zap authentication + OMQ_TRACE("processing zap requests"); + process_zap_requests(); + } // See if we can drain anything from the current queue before we potentially add to it // below. @@ -548,15 +606,14 @@ void OxenMQ::proxy_loop(std::promise startup) { proxy_process_queue(); OMQ_TRACE("processing new incoming messages"); + if (process_all) { + queue.resize(connections.size() + 1); + int i = 0; + for (auto& id_sock : connections) + queue[i++] = &id_sock; + queue[i] = nullptr; + } - // We round-robin connections when pulling off pending messages one-by-one rather than - // pulling off all messages from one connection before moving to the next; thus in cases of - // contention we end up fairly distributing. - std::vector*> queue; // Used as a circular buffer - queue.reserve(connections.size() + 1); - for (auto& id_sock : connections) - queue.push_back(&id_sock); - queue.push_back(nullptr); size_t end = queue.size() - 1; for (size_t pos = 0; pos != end; ++pos %= queue.size()) { @@ -580,8 +637,8 @@ void OxenMQ::proxy_loop(std::promise startup) { proxy_to_worker(id, sock, parts); if (connections_updated) { - // If connections got updated then our points are stale, to restart the proxy loop; - // if there are still messages waiting we'll end up right back here. + // If connections got updated then our points are stale, so restart the proxy loop; + // we'll immediately end up right back here at least once before we resume polling. OMQ_TRACE("connections became stale; short-circuiting incoming message loop"); break; }