Merge remote-tracking branch 'origin/stable' into debian/bullseye

This commit is contained in:
Jason Rhinelander 2023-09-28 11:10:52 -03:00
commit 3c90b0db91
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
7 changed files with 173 additions and 26 deletions

View File

@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liboxenmq
VERSION 1.2.14
VERSION 1.2.16
LANGUAGES CXX C)
include(GNUInstallDirs)
@ -39,9 +39,15 @@ set(oxenmq_INSTALL_DEFAULT OFF)
if(BUILD_SHARED_LIBS OR oxenmq_IS_TOPLEVEL_PROJECT)
set(oxenmq_INSTALL_DEFAULT ON)
endif()
set(oxenmq_EPOLL_DEFAULT OFF)
if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT CMAKE_CROSSCOMPILING)
set(oxenmq_EPOLL_DEFAULT ON)
endif()
option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT})
option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT})
option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON)
option(OXENMQ_USE_EPOLL "Use epoll for socket polling (requires Linux)" ${oxenmq_EPOLL_DEFAULT})
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
@ -59,6 +65,9 @@ add_library(oxenmq
oxenmq/worker.cpp
)
set_target_properties(oxenmq PROPERTIES SOVERSION ${OXENMQ_LIBVERSION})
if(OXENMQ_USE_EPOLL)
target_compile_definitions(oxenmq PRIVATE OXENMQ_USE_EPOLL)
endif()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)

@ -1 +1 @@
Subproject commit 707a83609fb64d09b61ed1e56c82bf692050d2a1
Subproject commit 462be41bd481b331dabeb3c220b349ef35c89e56

View File

@ -3,12 +3,47 @@
#include <oxenc/hex.h>
#include <optional>
#ifdef OXENMQ_USE_EPOLL
extern "C" {
#include <sys/epoll.h>
#include <unistd.h>
}
#endif
namespace oxenmq {
std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) {
return o << conn.to_string();
}
#ifdef OXENMQ_USE_EPOLL
void OxenMQ::rebuild_pollitems() {
if (epoll_fd != -1)
close(epoll_fd);
epoll_fd = epoll_create1(0);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.u64 = EPOLL_COMMAND_ID;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, command.get(zmq::sockopt::fd), &ev);
ev.data.u64 = EPOLL_WORKER_ID;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, workers_socket.get(zmq::sockopt::fd), &ev);
ev.data.u64 = EPOLL_ZAP_ID;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, zap_auth.get(zmq::sockopt::fd), &ev);
for (auto& [id, s] : connections) {
ev.data.u64 = id;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s.get(zmq::sockopt::fd), &ev);
}
connections_updated = false;
}
#else // !OXENMQ_USE_EPOLL
namespace {
void add_pollitem(std::vector<zmq::pollitem_t>& pollitems, zmq::socket_t& sock) {
@ -33,6 +68,8 @@ void OxenMQ::rebuild_pollitems() {
connections_updated = false;
}
#endif // OXENMQ_USE_EPOLL
void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count());
socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count());

View File

@ -1,4 +1,5 @@
#pragma once
#include <limits>
#include "oxenmq.h"
// Inside some method:
@ -20,6 +21,14 @@ constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers";
constexpr char SN_ADDR_SELF[] = "inproc://sn-self";
constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01";
#ifdef OXENMQ_USE_EPOLL
constexpr auto EPOLL_COMMAND_ID = std::numeric_limits<uint64_t>::max();
constexpr auto EPOLL_WORKER_ID = std::numeric_limits<uint64_t>::max() - 1;
constexpr auto EPOLL_ZAP_ID = std::numeric_limits<uint64_t>::max() - 2;
#endif
/// Destructor for create_message(std::string&&) that zmq calls when it's done with the message.
extern "C" inline void message_buffer_destroy(void*, void* hint) {
delete reinterpret_cast<std::string*>(hint);

View File

@ -460,11 +460,12 @@ std::ostream &operator<<(std::ostream &os, LogLevel lvl) {
std::string make_random_string(size_t size) {
static thread_local std::mt19937_64 rng{std::random_device{}()};
static thread_local std::uniform_int_distribution<char> dist{std::numeric_limits<char>::min(), std::numeric_limits<char>::max()};
std::string rando;
rando.reserve(size);
for (size_t i = 0; i < size; i++)
rando += dist(rng);
while (rando.size() < size) {
uint64_t x = rng();
rando.append(reinterpret_cast<const char*>(&x), std::min<size_t>(size - rando.size(), 8));
}
return rando;
}

View File

@ -420,8 +420,14 @@ private:
/// sockets for inter-thread communication followed by a pollitem for every connection (both
/// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever
/// `connections_updated` is set to true.
///
/// On Linux, when using epoll, this is not used.
std::vector<zmq::pollitem_t> pollitems;
/// On Linux, when using epoll, this tracks the epoll file descriptor. Otherwise it does
/// nothing.
int epoll_fd = -1;
/// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets.
void rebuild_pollitems();

View File

@ -11,6 +11,10 @@ extern "C" {
}
#endif
#ifdef OXENMQ_USE_EPOLL
#include <sys/epoll.h>
#endif
#ifndef _WIN32
extern "C" {
#include <sys/stat.h>
@ -496,6 +500,12 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
// General vector for handling incoming messages:
std::vector<zmq::message_t> parts;
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
#ifdef OXENMQ_USE_EPOLL
std::vector<struct epoll_event> evs;
#endif
while (true) {
std::chrono::milliseconds poll_timeout;
if (max_workers == 0) { // Will be 0 only if we are quitting
@ -509,9 +519,52 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
}
if (connections_updated)
if (connections_updated) {
rebuild_pollitems();
// If we just rebuilt the queue then do a full check of everything, because we might
// have sockets that already edge-triggered that we need to fully drain before we start
// polling.
proxy_skip_one_poll = true;
}
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
queue.reserve(connections.size() + 1);
#ifdef OXENMQ_USE_EPOLL
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
if (proxy_skip_one_poll) {
proxy_skip_one_poll = false;
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_all = true;
}
else {
OMQ_TRACE("polling for new messages via epoll");
evs.resize(3 + connections.size());
const int max = epoll_wait(epoll_fd, evs.data(), evs.size(), poll_timeout.count());
queue.clear();
for (int i = 0; i < max; i++) {
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
process_command = true;
else if (conn_id == EPOLL_WORKER_ID)
process_worker = true;
else if (conn_id == EPOLL_ZAP_ID)
process_zap = true;
else if (auto it = connections.find(conn_id); it != connections.end())
queue.push_back(&*it);
}
queue.push_back(nullptr);
}
#else
if (proxy_skip_one_poll)
proxy_skip_one_poll = false;
else {
@ -524,23 +577,29 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
proxy_control_message(control_parts, len);
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
#endif
if (process_command) {
OMQ_TRACE("processing control messages");
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
proxy_control_message(control_parts, len);
}
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
proxy_worker_message(control_parts, len);
if (process_worker) {
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
proxy_worker_message(control_parts, len);
}
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
if (process_zap) {
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
}
// See if we can drain anything from the current queue before we potentially add to it
// below.
@ -548,15 +607,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
proxy_process_queue();
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.clear();
for (auto& id_sock : connections)
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
queue.push_back(&id_sock);
queue.push_back(nullptr);
}
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
queue.reserve(connections.size() + 1);
for (auto& id_sock : connections)
queue.push_back(&id_sock);
queue.push_back(nullptr);
size_t end = queue.size() - 1;
for (size_t pos = 0; pos != end; ++pos %= queue.size()) {
@ -580,13 +638,40 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
proxy_to_worker(id, sock, parts);
if (connections_updated) {
// If connections got updated then our points are stale, to restart the proxy loop;
// if there are still messages waiting we'll end up right back here.
// If connections got updated then our points are stale, so restart the proxy loop;
// we'll immediately end up right back here at least once before we resume polling.
OMQ_TRACE("connections became stale; short-circuiting incoming message loop");
break;
}
}
#ifdef OXENMQ_USE_EPOLL
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
// state on another socket, perhaps by writing to it) then we need to repeat the loop
// *without* going back to epoll again, until we get through everything without any
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
// of ZMQ's edge-triggered notifications on zmq fd's.
//
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
if (!connections_updated && !proxy_skip_one_poll) {
for (auto* s : {&command, &workers_socket, &zap_auth}) {
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
if (!proxy_skip_one_poll) {
for (auto& [id, sock] : connections) {
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
}
}
#endif
OMQ_TRACE("done proxy loop");
}
}