Fix zmq socket limit setting

MAX_SOCKETS wasn't working properly because ZMQ uses it when the context
is initialized, which happens when the first socket is constructed on
that context.

For OxenMQ, we had several sockets constructed on the context during
OxenMQ construction, which meant the context_t was being initialized
during OxenMQ construction, rather than during start(), and so setting
MAX_SOCKETS would have no effect and you'd always get the default.

This fixes it by making all the member variable zmq::socket_t's
default-constructed, then replacing them with proper zmq::socket_t's
during startup() so that we also defer zmq::context_t initialization to
the right place.

A second issue found during testing (also fixed here) is that the socket
worker threads use to communicate to the proxy could fail if the worker
socket creation would violate the zmq max sockets limit, which wound up
throwing an uncaught exception and aborting.  This pre-initializes (but
doesn't connect) all potential worker threads sockets during start() so
that the lazily-initialized worker thread will have one already set up
rather than having to create a new one (which could fail).
This commit is contained in:
Jason Rhinelander 2022-08-04 23:47:54 -03:00
parent c854046684
commit edcde9246a
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
8 changed files with 145 additions and 63 deletions

View File

@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liboxenmq
VERSION 1.2.12
VERSION 1.2.13
LANGUAGES CXX C)
include(GNUInstallDirs)

View File

@ -1,6 +1,7 @@
#include "oxenmq.h"
#include "oxenmq-internal.h"
#include <oxenc/hex.h>
#include <optional>
namespace oxenmq {
@ -156,10 +157,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
}
OMQ_LOG(debug, oxenc::to_hex(pubkey), " (me) connecting to ", addr, " to reach ", oxenc::to_hex(remote));
zmq::socket_t socket{context, zmq::socket_type::dealer};
setup_outgoing_socket(socket, remote, use_ephemeral_routing_id);
std::optional<zmq::socket_t> socket;
try {
socket.connect(addr);
socket.emplace(context, zmq::socket_type::dealer);
setup_outgoing_socket(*socket, remote, use_ephemeral_routing_id);
socket->connect(addr);
} catch (const zmq::error_t& e) {
// Note that this failure cases indicates something serious went wrong that means zmq isn't
// even going to try connecting (for example an unparseable remote address).
@ -175,7 +177,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
p.activity();
connections_updated = true;
outgoing_sn_conns.emplace_hint(outgoing_sn_conns.end(), p.conn_id, ConnectionID{remote});
auto it = connections.emplace_hint(connections.end(), p.conn_id, std::move(socket));
auto it = connections.emplace_hint(connections.end(), p.conn_id, *std::move(socket));
return {&it->second, ""s};
}
@ -321,10 +323,11 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) {
OMQ_LOG(debug, "Establishing remote connection to ", remote,
remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + oxenc::to_hex(remote_pubkey));
zmq::socket_t sock{context, zmq::socket_type::dealer};
std::optional<zmq::socket_t> sock;
try {
setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid);
sock.connect(remote);
sock.emplace(context, zmq::socket_type::dealer);
setup_outgoing_socket(*sock, remote_pubkey, ephemeral_rid);
sock->connect(remote);
} catch (const zmq::error_t &e) {
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
on_failure(conn_id, std::move(what));
@ -332,7 +335,7 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) {
return;
}
auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(sock))->second;
auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(*sock))->second;
connections_updated = true;
OMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI");
send_direct_message(s, "HI");

View File

@ -2,6 +2,7 @@
#include "oxenmq-internal.h"
#include "zmq.hpp"
#include <map>
#include <mutex>
#include <random>
#include <ostream>
#include <thread>
@ -236,15 +237,42 @@ void OxenMQ::start() {
OMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", oxenc::to_hex(pubkey));
int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit);
if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit)
context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS);
else
OMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]");
assert(general_workers > 0);
if (batch_jobs_reserved < 0)
batch_jobs_reserved = (general_workers + 1) / 2;
if (reply_jobs_reserved < 0)
reply_jobs_reserved = (general_workers + 7) / 8;
max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved;
for (const auto& cat : categories) {
max_workers += cat.second.reserved_threads;
}
if (log_level() >= LogLevel::debug) {
OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
for (const auto& cat : categories)
OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
}
if (MAX_SOCKETS != 0) {
// The max sockets setting we apply to the context here is used during zmq context
// initialization, which happens when the first socket is constructed using this context:
// hence we set this *before* constructing any socket_t on the context.
int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit);
int want_sockets = MAX_SOCKETS < 0 ? zmq_socket_limit :
std::min<int>(zmq_socket_limit,
MAX_SOCKETS + max_workers + tagged_workers.size()
+ 4 /* zap_auth, workers_socket, command, inproc_listener */);
context.set(zmq::ctxopt::max_sockets, want_sockets);
}
// We bind `command` here so that the `get_control_socket()` below is always connecting to a
// bound socket, but we do nothing else here: the proxy thread is responsible for everything
// except binding it.
command = zmq::socket_t{context, zmq::socket_type::router};
command.bind(SN_ADDR_COMMAND);
std::promise<void> startup_prom;
auto proxy_startup = startup_prom.get_future();
@ -399,23 +427,13 @@ OxenMQ::run_info& OxenMQ::run_info::load(batch_job&& bj, bool reply_job, int tag
OxenMQ::~OxenMQ() {
if (!proxy_thread.joinable()) {
if (!tagged_workers.empty()) {
// This is a bit icky: we have tagged workers that are waiting for a signal on
// workers_socket, but the listening end of workers_socket doesn't get set up until the
// proxy thread starts (and we're getting destructed here without a proxy thread). So
// we need to start listening on it here in the destructor so that we establish a
// connection and send the QUITs to the tagged worker threads.
workers_socket.set(zmq::sockopt::router_mandatory, true);
workers_socket.bind(SN_ADDR_WORKERS);
for (auto& [run, busy, queue] : tagged_workers) {
while (true) {
try {
route_control(workers_socket, run.worker_routing_id, "QUIT");
break;
} catch (const zmq::error_t&) {
std::this_thread::sleep_for(5ms);
}
}
// We have tagged workers that are waiting on a signal for startup, but we didn't start
// up, so signal them so that they can end themselves.
{
std::lock_guard lock{tagged_startup_mutex};
tagged_go = true;
}
tagged_cv.notify_all();
for (auto& [run, busy, queue] : tagged_workers)
run.worker_thread.join();
}

View File

@ -28,6 +28,7 @@
#pragma once
#include <condition_variable>
#include <string>
#include <string_view>
#include <list>
@ -237,7 +238,9 @@ public:
/** Maximum open sockets, passed to the ZMQ context during start(). The default here is 10k,
* designed to be enough to be more than enough to allow a full-mesh SN layer connection if
* necessary for the forseeable future. */
* necessary for the forseeable future. The actual value passed to ZMQ will be slightly higher,
* to allow for internal inter-thread communication sockets. Set to 0 to explicitly avoid
* setting the value; set to -1 to use the maximum supported by ZMQ. */
int MAX_SOCKETS = 10000;
/** Minimum reconnect interval: when a connection fails or dies, wait this long before
@ -332,7 +335,7 @@ private:
/// The socket we listen on for handling ZAP authentication requests (the other end is internal
/// to zmq which sends requests to us as needed).
zmq::socket_t zap_auth{context, zmq::socket_type::rep};
zmq::socket_t zap_auth;
struct bind_data {
std::string address;
@ -436,7 +439,7 @@ private:
/// internal "control" connection (returned by `get_control_socket()`) to this socket used to
/// give instructions to the proxy such as instructing it to initiate a connection to a remote
/// or send a message.
zmq::socket_t command{context, zmq::socket_type::router};
zmq::socket_t command;
/// Timers. TODO: once cppzmq adds an interface around the zmq C timers API then switch to it.
struct TimersDeleter { void operator()(void* timers); };
@ -455,7 +458,7 @@ public:
private:
/// Router socket to reach internal worker threads from proxy
zmq::socket_t workers_socket{context, zmq::socket_type::router};
zmq::socket_t workers_socket;
/// indices of idle, active workers; note that this vector is usually oversized
std::vector<unsigned int> idle_workers;
@ -474,7 +477,7 @@ private:
int active_workers() const { return workers.size() - idle_worker_count; }
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
void worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start);
/// If set, skip polling for one proxy loop iteration (set when we know we have something
/// processible without having to shove it onto a socket, such as scheduling an internal job).
@ -771,11 +774,23 @@ private:
/// change it.
std::vector<run_info> workers;
/// Dealer sockets for workers to use to talk to the proxy thread. These are initialized during
/// start(), and after that belong exclusively to the worker thread with the same index as used
/// in `workers`.
std::vector<zmq::socket_t> worker_sockets;
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
/// is whether the worker is currently busy (true) or available (false).
std::vector<std::tuple<run_info, bool, batch_queue>> tagged_workers;
/// Startup signalling for tagged workers; the tagged threads get initialized before startup,
/// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't
/// used after startup is complete.
std::mutex tagged_startup_mutex;
bool tagged_go{false};
std::condition_variable tagged_cv;
public:
/**
* OxenMQ constructor. This constructs the object but does not start it; you will typically

View File

@ -377,36 +377,23 @@ void OxenMQ::proxy_loop_init() {
pthread_setname_np("omq-proxy");
#endif
zap_auth = zmq::socket_t{context, zmq::socket_type::rep};
zap_auth.set(zmq::sockopt::linger, 0);
zap_auth.bind(ZMQ_ADDR_ZAP);
workers_socket = zmq::socket_t{context, zmq::socket_type::router};
workers_socket.set(zmq::sockopt::router_mandatory, true);
workers_socket.bind(SN_ADDR_WORKERS);
assert(general_workers > 0);
if (batch_jobs_reserved < 0)
batch_jobs_reserved = (general_workers + 1) / 2;
if (reply_jobs_reserved < 0)
reply_jobs_reserved = (general_workers + 7) / 8;
max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved;
for (const auto& cat : categories) {
max_workers += cat.second.reserved_threads;
}
if (log_level() >= LogLevel::debug) {
OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
for (const auto& cat : categories)
OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
}
workers.reserve(max_workers);
idle_workers.resize(max_workers);
if (!workers.empty())
if (!workers.empty() || !worker_sockets.empty())
throw std::logic_error("Internal error: proxy thread started with active worker threads");
worker_sockets.reserve(max_workers);
// Pre-initialize these worker sockets rather than creating during thread initialization so that
// we can't hit the zmq socket limit during worker thread startup.
for (int i = 0; i < max_workers; i++)
worker_sockets.emplace_back(context, zmq::socket_type::dealer);
#ifndef _WIN32
int saved_umask = -1;
@ -466,6 +453,11 @@ void OxenMQ::proxy_loop_init() {
// synchronization dance to guarantee that the workers are routable before we can proceed.
if (!tagged_workers.empty()) {
OMQ_LOG(debug, "Waiting for tagged workers");
{
std::unique_lock lock{tagged_startup_mutex};
tagged_go = true;
}
tagged_cv.notify_all();
std::unordered_set<std::string_view> waiting_on;
for (auto& w : tagged_workers)
waiting_on.emplace(std::get<run_info>(w).worker_routing_id);

View File

@ -62,7 +62,22 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
pthread_setname_np(thread_name.c_str());
#endif
zmq::socket_t sock{context, zmq::socket_type::dealer};
std::optional<zmq::socket_t> tagged_socket;
if (tagged) {
// If we're a tagged worker then we got started up before OxenMQ started, so we need to wait
// for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can
// start waiting for commands in the main loop further down. (We also can't get the
// reference to our `tagged_workers` element or create a socket until the main proxy thread
// is running).
{
std::unique_lock lock{tagged_startup_mutex};
tagged_cv.wait(lock, [this] { return tagged_go; });
}
if (!proxy_thread.joinable()) // OxenMQ destroyed without starting
return;
tagged_socket.emplace(context, zmq::socket_type::dealer);
}
auto& sock = tagged ? *tagged_socket : worker_sockets[index];
sock.set(zmq::sockopt::routing_id, routing_id);
OMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
sock.connect(SN_ADDR_WORKERS);
@ -74,11 +89,6 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
bool waiting_for_command;
if (tagged) {
// If we're a tagged worker then we got started up before OxenMQ started, so we need to wait
// for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can
// start waiting for commands in the main loop further down. (We also can't get the
// reference to our `tagged_workers` element until the main proxy threads is running).
waiting_for_command = true;
if (!worker_wait_for(*this, sock, parts, worker_id, "START"sv))
@ -268,7 +278,7 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l
void OxenMQ::proxy_run_worker(run_info& run) {
if (!run.worker_thread.joinable())
run.worker_thread = std::thread{[this, id=run.worker_id] { worker_thread(id); }};
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, std::nullopt, nullptr};
else
send_routed_message(workers_socket, run.worker_routing_id, "RUN");
}

View File

@ -10,6 +10,7 @@ add_executable(tests
test_failures.cpp
test_inject.cpp
test_requests.cpp
test_socket_limit.cpp
test_tagged_threads.cpp
test_timer.cpp
)

View File

@ -0,0 +1,43 @@
#include "common.h"
#include <oxenc/hex.h>
using namespace oxenmq;
TEST_CASE("zmq socket limit", "[zmq][socket-limit]") {
// Make sure setting .MAX_SOCKETS works as expected. (This test was added when a bug was fixed
// that was causing it not to be applied).
std::string listen = random_localhost();
OxenMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
server.listen_plain(listen);
server.start();
std::atomic<int> failed = 0, good = 0, failed_toomany = 0;
OxenMQ client;
client.MAX_SOCKETS = 15;
client.start();
std::vector<ConnectionID> conns;
address server_addr{listen};
for (int i = 0; i < 16; i++)
client.connect_remote(server_addr,
[&](auto) { good++; },
[&](auto cid, auto msg) {
if (msg == "connect() failed: Too many open files")
failed_toomany++;
else
failed++;
});
wait_for([&] { return good > 0 && failed_toomany > 0; });
{
auto lock = catch_lock();
REQUIRE( good > 0 );
REQUIRE( failed == 0 );
REQUIRE( failed_toomany > 0 );
}
}