Add epoll support for Linux

Each call to zmq::poll is painfully slow when we have many open zmq
sockets, such as when we have 1800 outbound connections (i.e. connected
to every other service node, as services nodes might have sometimes and
the Session push notification server *always* has).

In testing on my local Ryzen 5950 system each time we go back to
zmq::poll incurs about 1.5ms of (mostly system) CPU time with 2000 open
outbound sockets, and so if we're being pelted with a nearly constant
stream of requests (such as happens with the Session push notification
server) we incur massive CPU costs every time we finish processing
messages and go back to wait (via zmq::poll) for more.

In testing a simple ZMQ (no OxenMQ) client/server that establishes 2000
connections to a server, and then has the server send a message back on
a random connection every 1ms, we get atrocious CPU usage: the proxy
thread spends a constant 100% CPU time.  Virtually all of this is in the
poll call itself, though, so we aren't really bottlenecked by how much
can go through the proxy thread: in such a scenario the poll call uses
its CPU then returns right away, we process the queue of messages, and
return to another poll call.  If we have lots of messages received in
that time, though (because messages are coming fast and the poll was
slow) then we process a lot all at once before going back to the poll,
so the main consequences here are that:

1) We use a huge amount of CPU
2) We introduce latency in a busy situation because the CPU has to make
   the poll call (e.g. 1.5ms) before the next message can be processed.
3) If traffic is very bursty then the latency can manifest another
   problem: in the time it takes to poll we could accumulate enough
   incoming messages to overfill our internal per-category job queue,
   which was happening in the SPNS.

(I also tested with 20k connections, and the poll time scaling was
linear: we still processed everything, but in larger chunks because
every poll call took about 15ms, and so we'd have about 15 messages at a
time to process with added latency of up to 15ms).

Switching to epoll *drastically* reduces the CPU usage in two ways:

1) It's massively faster by design: there's a single setup and
   communication of all the polling details to the kernel which we only
   have to do when our set of zmq sockets changes (which is relatively
   rare).
2) We can further reduce CPU time because epoll tells us *which* sockets
   need attention, and so if only 1 connection out of the 2000 sent us
   something we can only bother checking that single socket for
   messages.  (In theory we can do the same with zmq::poll by querying
   for events available on the socket, but in practice it doesn't
   improve anything over just trying to read from them all).

In my straight zmq test script, using epoll instead reduced CPU usage in
the sends-every-1ms scenario from a constant pegged 100% of a core to an
average of 2-3% of a single core.  (Moreover this CPU usage level didn't
noticeably change when using 20k connections instead of 2k).
This commit is contained in:
Jason Rhinelander 2023-09-14 14:38:39 -03:00
parent e1b66ced48
commit 8f97add30f
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
5 changed files with 140 additions and 22 deletions

View File

@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liboxenmq
VERSION 1.2.15
VERSION 1.2.16
LANGUAGES CXX C)
include(GNUInstallDirs)
@ -39,9 +39,15 @@ set(oxenmq_INSTALL_DEFAULT OFF)
if(BUILD_SHARED_LIBS OR oxenmq_IS_TOPLEVEL_PROJECT)
set(oxenmq_INSTALL_DEFAULT ON)
endif()
set(oxenmq_EPOLL_DEFAULT OFF)
if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT CMAKE_CROSSCOMPILING)
set(oxenmq_EPOLL_DEFAULT ON)
endif()
option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT})
option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT})
option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON)
option(OXENMQ_USE_EPOLL "Use epoll for socket polling (requires Linux)" ${oxenmq_EPOLL_DEFAULT})
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
@ -59,6 +65,9 @@ add_library(oxenmq
oxenmq/worker.cpp
)
set_target_properties(oxenmq PROPERTIES SOVERSION ${OXENMQ_LIBVERSION})
if(OXENMQ_USE_EPOLL)
target_compile_definitions(oxenmq PRIVATE OXENMQ_USE_EPOLL)
endif()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)

View File

@ -3,12 +3,47 @@
#include <oxenc/hex.h>
#include <optional>
#ifdef OXENMQ_USE_EPOLL
extern "C" {
#include <sys/epoll.h>
#include <unistd.h>
}
#endif
namespace oxenmq {
std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) {
return o << conn.to_string();
}
#ifdef OXENMQ_USE_EPOLL
void OxenMQ::rebuild_pollitems() {
if (epoll_fd != -1)
close(epoll_fd);
epoll_fd = epoll_create1(0);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.u64 = EPOLL_COMMAND_ID;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, command.get(zmq::sockopt::fd), &ev);
ev.data.u64 = EPOLL_WORKER_ID;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, workers_socket.get(zmq::sockopt::fd), &ev);
ev.data.u64 = EPOLL_ZAP_ID;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, zap_auth.get(zmq::sockopt::fd), &ev);
for (auto& [id, s] : connections) {
ev.data.u64 = id;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s.get(zmq::sockopt::fd), &ev);
}
connections_updated = false;
}
#else // !OXENMQ_USE_EPOLL
namespace {
void add_pollitem(std::vector<zmq::pollitem_t>& pollitems, zmq::socket_t& sock) {
@ -33,6 +68,8 @@ void OxenMQ::rebuild_pollitems() {
connections_updated = false;
}
#endif // OXENMQ_USE_EPOLL
void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count());
socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count());

View File

@ -1,4 +1,5 @@
#pragma once
#include <limits>
#include "oxenmq.h"
// Inside some method:
@ -20,6 +21,14 @@ constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers";
constexpr char SN_ADDR_SELF[] = "inproc://sn-self";
constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01";
#ifdef OXENMQ_USE_EPOLL
constexpr auto EPOLL_COMMAND_ID = std::numeric_limits<uint64_t>::max();
constexpr auto EPOLL_WORKER_ID = std::numeric_limits<uint64_t>::max() - 1;
constexpr auto EPOLL_ZAP_ID = std::numeric_limits<uint64_t>::max() - 2;
#endif
/// Destructor for create_message(std::string&&) that zmq calls when it's done with the message.
extern "C" inline void message_buffer_destroy(void*, void* hint) {
delete reinterpret_cast<std::string*>(hint);

View File

@ -420,8 +420,14 @@ private:
/// sockets for inter-thread communication followed by a pollitem for every connection (both
/// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever
/// `connections_updated` is set to true.
///
/// On Linux, when using epoll, this is not used.
std::vector<zmq::pollitem_t> pollitems;
/// On Linux, when using epoll, this tracks the epoll file descriptor. Otherwise it does
/// nothing.
int epoll_fd = -1;
/// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets.
void rebuild_pollitems();

View File

@ -11,6 +11,10 @@ extern "C" {
}
#endif
#ifdef OXENMQ_USE_EPOLL
#include <sys/epoll.h>
#endif
#ifndef _WIN32
extern "C" {
#include <sys/stat.h>
@ -496,6 +500,12 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
// General vector for handling incoming messages:
std::vector<zmq::message_t> parts;
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
#ifdef OXENMQ_USE_EPOLL
std::vector<struct epoll_event> evs;
#endif
while (true) {
std::chrono::milliseconds poll_timeout;
if (max_workers == 0) { // Will be 0 only if we are quitting
@ -509,9 +519,48 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
}
if (connections_updated)
if (connections_updated) {
rebuild_pollitems();
// If we just rebuilt the queue then do a full check of everything, because we might
// have sockets that already edge-triggered that we need to fully drain before we start
// polling.
proxy_skip_one_poll = true;
}
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
queue.reserve(connections.size() + 1);
#ifdef OXENMQ_USE_EPOLL
bool process_control = false, process_worker = false, process_zap = false, process_all = false;
if (proxy_skip_one_poll) {
proxy_skip_one_poll = false;
process_all = true;
}
else {
OMQ_TRACE("polling for new messages via epoll");
evs.resize(3 + connections.size());
const int max = epoll_wait(epoll_fd, evs.data(), evs.size(), poll_timeout.count());
queue.clear();
for (int i = 0; i < max; i++) {
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
process_control = true;
else if (conn_id == EPOLL_WORKER_ID)
process_worker = true;
else if (conn_id == EPOLL_ZAP_ID)
process_zap = true;
else if (auto it = connections.find(conn_id); it != connections.end())
queue.push_back(&*it);
}
queue.push_back(nullptr);
}
#else
if (proxy_skip_one_poll)
proxy_skip_one_poll = false;
else {
@ -524,23 +573,32 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
proxy_control_message(control_parts, len);
constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true;
#endif
if (process_control || process_all) {
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
proxy_control_message(control_parts, len);
}
}
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
proxy_worker_message(control_parts, len);
if (process_worker || process_all) {
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
proxy_worker_message(control_parts, len);
}
}
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
if (process_zap || process_all) {
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
}
// See if we can drain anything from the current queue before we potentially add to it
// below.
@ -548,15 +606,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
proxy_process_queue();
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.resize(connections.size() + 1);
int i = 0;
for (auto& id_sock : connections)
queue[i++] = &id_sock;
queue[i] = nullptr;
}
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
queue.reserve(connections.size() + 1);
for (auto& id_sock : connections)
queue.push_back(&id_sock);
queue.push_back(nullptr);
size_t end = queue.size() - 1;
for (size_t pos = 0; pos != end; ++pos %= queue.size()) {
@ -580,8 +637,8 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
proxy_to_worker(id, sock, parts);
if (connections_updated) {
// If connections got updated then our points are stale, to restart the proxy loop;
// if there are still messages waiting we'll end up right back here.
// If connections got updated then our points are stale, so restart the proxy loop;
// we'll immediately end up right back here at least once before we resume polling.
OMQ_TRACE("connections became stale; short-circuiting incoming message loop");
break;
}