Merge pull request #79 from jagerman/socket-limits

Fix zmq socket limit setting
This commit is contained in:
Jason Rhinelander 2022-08-31 11:57:22 -03:00 committed by GitHub
commit 057685b7c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 146 additions and 63 deletions

View File

@ -103,6 +103,7 @@ local full_llvm(version) = debian_pipeline(
commands: [
'mkdir build',
'cd build',
'ulimit -n 1024', // Because macOS has a stupid tiny default ulimit
'cmake .. -G Ninja -DCMAKE_CXX_FLAGS=-fcolor-diagnostics -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER_LAUNCHER=ccache',
'ninja -v',
'./tests/tests --use-colour yes',

View File

@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liboxenmq
VERSION 1.2.12
VERSION 1.2.13
LANGUAGES CXX C)
include(GNUInstallDirs)

View File

@ -1,6 +1,7 @@
#include "oxenmq.h"
#include "oxenmq-internal.h"
#include <oxenc/hex.h>
#include <optional>
namespace oxenmq {
@ -156,10 +157,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
}
OMQ_LOG(debug, oxenc::to_hex(pubkey), " (me) connecting to ", addr, " to reach ", oxenc::to_hex(remote));
zmq::socket_t socket{context, zmq::socket_type::dealer};
setup_outgoing_socket(socket, remote, use_ephemeral_routing_id);
std::optional<zmq::socket_t> socket;
try {
socket.connect(addr);
socket.emplace(context, zmq::socket_type::dealer);
setup_outgoing_socket(*socket, remote, use_ephemeral_routing_id);
socket->connect(addr);
} catch (const zmq::error_t& e) {
// Note that this failure cases indicates something serious went wrong that means zmq isn't
// even going to try connecting (for example an unparseable remote address).
@ -175,7 +177,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
p.activity();
connections_updated = true;
outgoing_sn_conns.emplace_hint(outgoing_sn_conns.end(), p.conn_id, ConnectionID{remote});
auto it = connections.emplace_hint(connections.end(), p.conn_id, std::move(socket));
auto it = connections.emplace_hint(connections.end(), p.conn_id, *std::move(socket));
return {&it->second, ""s};
}
@ -321,10 +323,11 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) {
OMQ_LOG(debug, "Establishing remote connection to ", remote,
remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + oxenc::to_hex(remote_pubkey));
zmq::socket_t sock{context, zmq::socket_type::dealer};
std::optional<zmq::socket_t> sock;
try {
setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid);
sock.connect(remote);
sock.emplace(context, zmq::socket_type::dealer);
setup_outgoing_socket(*sock, remote_pubkey, ephemeral_rid);
sock->connect(remote);
} catch (const zmq::error_t &e) {
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
on_failure(conn_id, std::move(what));
@ -332,7 +335,7 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) {
return;
}
auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(sock))->second;
auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(*sock))->second;
connections_updated = true;
OMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI");
send_direct_message(s, "HI");

View File

@ -2,6 +2,7 @@
#include "oxenmq-internal.h"
#include "zmq.hpp"
#include <map>
#include <mutex>
#include <random>
#include <ostream>
#include <thread>
@ -236,15 +237,42 @@ void OxenMQ::start() {
OMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", oxenc::to_hex(pubkey));
int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit);
if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit)
context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS);
else
OMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]");
assert(general_workers > 0);
if (batch_jobs_reserved < 0)
batch_jobs_reserved = (general_workers + 1) / 2;
if (reply_jobs_reserved < 0)
reply_jobs_reserved = (general_workers + 7) / 8;
max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved;
for (const auto& cat : categories) {
max_workers += cat.second.reserved_threads;
}
if (log_level() >= LogLevel::debug) {
OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
for (const auto& cat : categories)
OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
}
if (MAX_SOCKETS != 0) {
// The max sockets setting we apply to the context here is used during zmq context
// initialization, which happens when the first socket is constructed using this context:
// hence we set this *before* constructing any socket_t on the context.
int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit);
int want_sockets = MAX_SOCKETS < 0 ? zmq_socket_limit :
std::min<int>(zmq_socket_limit,
MAX_SOCKETS + max_workers + tagged_workers.size()
+ 4 /* zap_auth, workers_socket, command, inproc_listener */);
context.set(zmq::ctxopt::max_sockets, want_sockets);
}
// We bind `command` here so that the `get_control_socket()` below is always connecting to a
// bound socket, but we do nothing else here: the proxy thread is responsible for everything
// except binding it.
command = zmq::socket_t{context, zmq::socket_type::router};
command.bind(SN_ADDR_COMMAND);
std::promise<void> startup_prom;
auto proxy_startup = startup_prom.get_future();
@ -399,23 +427,13 @@ OxenMQ::run_info& OxenMQ::run_info::load(batch_job&& bj, bool reply_job, int tag
OxenMQ::~OxenMQ() {
if (!proxy_thread.joinable()) {
if (!tagged_workers.empty()) {
// This is a bit icky: we have tagged workers that are waiting for a signal on
// workers_socket, but the listening end of workers_socket doesn't get set up until the
// proxy thread starts (and we're getting destructed here without a proxy thread). So
// we need to start listening on it here in the destructor so that we establish a
// connection and send the QUITs to the tagged worker threads.
workers_socket.set(zmq::sockopt::router_mandatory, true);
workers_socket.bind(SN_ADDR_WORKERS);
for (auto& [run, busy, queue] : tagged_workers) {
while (true) {
try {
route_control(workers_socket, run.worker_routing_id, "QUIT");
break;
} catch (const zmq::error_t&) {
std::this_thread::sleep_for(5ms);
}
}
// We have tagged workers that are waiting on a signal for startup, but we didn't start
// up, so signal them so that they can end themselves.
{
std::lock_guard lock{tagged_startup_mutex};
tagged_go = true;
}
tagged_cv.notify_all();
for (auto& [run, busy, queue] : tagged_workers)
run.worker_thread.join();
}

View File

@ -28,6 +28,7 @@
#pragma once
#include <condition_variable>
#include <string>
#include <string_view>
#include <list>
@ -237,7 +238,9 @@ public:
/** Maximum open sockets, passed to the ZMQ context during start(). The default here is 10k,
* designed to be enough to be more than enough to allow a full-mesh SN layer connection if
* necessary for the forseeable future. */
* necessary for the forseeable future. The actual value passed to ZMQ will be slightly higher,
* to allow for internal inter-thread communication sockets. Set to 0 to explicitly avoid
* setting the value; set to -1 to use the maximum supported by ZMQ. */
int MAX_SOCKETS = 10000;
/** Minimum reconnect interval: when a connection fails or dies, wait this long before
@ -332,7 +335,7 @@ private:
/// The socket we listen on for handling ZAP authentication requests (the other end is internal
/// to zmq which sends requests to us as needed).
zmq::socket_t zap_auth{context, zmq::socket_type::rep};
zmq::socket_t zap_auth;
struct bind_data {
std::string address;
@ -436,7 +439,7 @@ private:
/// internal "control" connection (returned by `get_control_socket()`) to this socket used to
/// give instructions to the proxy such as instructing it to initiate a connection to a remote
/// or send a message.
zmq::socket_t command{context, zmq::socket_type::router};
zmq::socket_t command;
/// Timers. TODO: once cppzmq adds an interface around the zmq C timers API then switch to it.
struct TimersDeleter { void operator()(void* timers); };
@ -455,7 +458,7 @@ public:
private:
/// Router socket to reach internal worker threads from proxy
zmq::socket_t workers_socket{context, zmq::socket_type::router};
zmq::socket_t workers_socket;
/// indices of idle, active workers; note that this vector is usually oversized
std::vector<unsigned int> idle_workers;
@ -474,7 +477,7 @@ private:
int active_workers() const { return workers.size() - idle_worker_count; }
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
void worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start);
/// If set, skip polling for one proxy loop iteration (set when we know we have something
/// processible without having to shove it onto a socket, such as scheduling an internal job).
@ -771,11 +774,23 @@ private:
/// change it.
std::vector<run_info> workers;
/// Dealer sockets for workers to use to talk to the proxy thread. These are initialized during
/// start(), and after that belong exclusively to the worker thread with the same index as used
/// in `workers`.
std::vector<zmq::socket_t> worker_sockets;
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
/// is whether the worker is currently busy (true) or available (false).
std::vector<std::tuple<run_info, bool, batch_queue>> tagged_workers;
/// Startup signalling for tagged workers; the tagged threads get initialized before startup,
/// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't
/// used after startup is complete.
std::mutex tagged_startup_mutex;
bool tagged_go{false};
std::condition_variable tagged_cv;
public:
/**
* OxenMQ constructor. This constructs the object but does not start it; you will typically

View File

@ -377,36 +377,23 @@ void OxenMQ::proxy_loop_init() {
pthread_setname_np("omq-proxy");
#endif
zap_auth = zmq::socket_t{context, zmq::socket_type::rep};
zap_auth.set(zmq::sockopt::linger, 0);
zap_auth.bind(ZMQ_ADDR_ZAP);
workers_socket = zmq::socket_t{context, zmq::socket_type::router};
workers_socket.set(zmq::sockopt::router_mandatory, true);
workers_socket.bind(SN_ADDR_WORKERS);
assert(general_workers > 0);
if (batch_jobs_reserved < 0)
batch_jobs_reserved = (general_workers + 1) / 2;
if (reply_jobs_reserved < 0)
reply_jobs_reserved = (general_workers + 7) / 8;
max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved;
for (const auto& cat : categories) {
max_workers += cat.second.reserved_threads;
}
if (log_level() >= LogLevel::debug) {
OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
for (const auto& cat : categories)
OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
}
workers.reserve(max_workers);
idle_workers.resize(max_workers);
if (!workers.empty())
if (!workers.empty() || !worker_sockets.empty())
throw std::logic_error("Internal error: proxy thread started with active worker threads");
worker_sockets.reserve(max_workers);
// Pre-initialize these worker sockets rather than creating during thread initialization so that
// we can't hit the zmq socket limit during worker thread startup.
for (int i = 0; i < max_workers; i++)
worker_sockets.emplace_back(context, zmq::socket_type::dealer);
#ifndef _WIN32
int saved_umask = -1;
@ -466,6 +453,11 @@ void OxenMQ::proxy_loop_init() {
// synchronization dance to guarantee that the workers are routable before we can proceed.
if (!tagged_workers.empty()) {
OMQ_LOG(debug, "Waiting for tagged workers");
{
std::unique_lock lock{tagged_startup_mutex};
tagged_go = true;
}
tagged_cv.notify_all();
std::unordered_set<std::string_view> waiting_on;
for (auto& w : tagged_workers)
waiting_on.emplace(std::get<run_info>(w).worker_routing_id);

View File

@ -62,7 +62,22 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
pthread_setname_np(thread_name.c_str());
#endif
zmq::socket_t sock{context, zmq::socket_type::dealer};
std::optional<zmq::socket_t> tagged_socket;
if (tagged) {
// If we're a tagged worker then we got started up before OxenMQ started, so we need to wait
// for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can
// start waiting for commands in the main loop further down. (We also can't get the
// reference to our `tagged_workers` element or create a socket until the main proxy thread
// is running).
{
std::unique_lock lock{tagged_startup_mutex};
tagged_cv.wait(lock, [this] { return tagged_go; });
}
if (!proxy_thread.joinable()) // OxenMQ destroyed without starting
return;
tagged_socket.emplace(context, zmq::socket_type::dealer);
}
auto& sock = tagged ? *tagged_socket : worker_sockets[index];
sock.set(zmq::sockopt::routing_id, routing_id);
OMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
sock.connect(SN_ADDR_WORKERS);
@ -74,11 +89,6 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
bool waiting_for_command;
if (tagged) {
// If we're a tagged worker then we got started up before OxenMQ started, so we need to wait
// for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can
// start waiting for commands in the main loop further down. (We also can't get the
// reference to our `tagged_workers` element until the main proxy threads is running).
waiting_for_command = true;
if (!worker_wait_for(*this, sock, parts, worker_id, "START"sv))
@ -268,7 +278,7 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l
void OxenMQ::proxy_run_worker(run_info& run) {
if (!run.worker_thread.joinable())
run.worker_thread = std::thread{[this, id=run.worker_id] { worker_thread(id); }};
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, std::nullopt, nullptr};
else
send_routed_message(workers_socket, run.worker_routing_id, "RUN");
}

View File

@ -10,6 +10,7 @@ add_executable(tests
test_failures.cpp
test_inject.cpp
test_requests.cpp
test_socket_limit.cpp
test_tagged_threads.cpp
test_timer.cpp
)

View File

@ -0,0 +1,43 @@
#include "common.h"
#include <oxenc/hex.h>
using namespace oxenmq;
TEST_CASE("zmq socket limit", "[zmq][socket-limit]") {
// Make sure setting .MAX_SOCKETS works as expected. (This test was added when a bug was fixed
// that was causing it not to be applied).
std::string listen = random_localhost();
OxenMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
server.listen_plain(listen);
server.start();
std::atomic<int> failed = 0, good = 0, failed_toomany = 0;
OxenMQ client;
client.MAX_SOCKETS = 15;
client.start();
std::vector<ConnectionID> conns;
address server_addr{listen};
for (int i = 0; i < 16; i++)
client.connect_remote(server_addr,
[&](auto) { good++; },
[&](auto cid, auto msg) {
if (msg == "connect() failed: Too many open files")
failed_toomany++;
else
failed++;
});
wait_for([&] { return good > 0 && failed_toomany > 0; });
{
auto lock = catch_lock();
REQUIRE( good > 0 );
REQUIRE( failed == 0 );
REQUIRE( failed_toomany > 0 );
}
}