From edcde9246a628176e0e20362e5c1495bdf337ee2 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 4 Aug 2022 23:47:54 -0300 Subject: [PATCH 1/2] Fix zmq socket limit setting MAX_SOCKETS wasn't working properly because ZMQ uses it when the context is initialized, which happens when the first socket is constructed on that context. For OxenMQ, we had several sockets constructed on the context during OxenMQ construction, which meant the context_t was being initialized during OxenMQ construction, rather than during start(), and so setting MAX_SOCKETS would have no effect and you'd always get the default. This fixes it by making all the member variable zmq::socket_t's default-constructed, then replacing them with proper zmq::socket_t's during startup() so that we also defer zmq::context_t initialization to the right place. A second issue found during testing (also fixed here) is that the socket worker threads use to communicate to the proxy could fail if the worker socket creation would violate the zmq max sockets limit, which wound up throwing an uncaught exception and aborting. This pre-initializes (but doesn't connect) all potential worker threads sockets during start() so that the lazily-initialized worker thread will have one already set up rather than having to create a new one (which could fail). --- CMakeLists.txt | 2 +- oxenmq/connections.cpp | 19 +++++++----- oxenmq/oxenmq.cpp | 60 ++++++++++++++++++++++++------------- oxenmq/oxenmq.h | 25 ++++++++++++---- oxenmq/proxy.cpp | 34 ++++++++------------- oxenmq/worker.cpp | 24 ++++++++++----- tests/CMakeLists.txt | 1 + tests/test_socket_limit.cpp | 43 ++++++++++++++++++++++++++ 8 files changed, 145 insertions(+), 63 deletions(-) create mode 100644 tests/test_socket_limit.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 69a3e7b..269d92d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.12 + VERSION 1.2.13 LANGUAGES CXX C) include(GNUInstallDirs) diff --git a/oxenmq/connections.cpp b/oxenmq/connections.cpp index 79812a5..c5ecd70 100644 --- a/oxenmq/connections.cpp +++ b/oxenmq/connections.cpp @@ -1,6 +1,7 @@ #include "oxenmq.h" #include "oxenmq-internal.h" #include +#include namespace oxenmq { @@ -156,10 +157,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, } OMQ_LOG(debug, oxenc::to_hex(pubkey), " (me) connecting to ", addr, " to reach ", oxenc::to_hex(remote)); - zmq::socket_t socket{context, zmq::socket_type::dealer}; - setup_outgoing_socket(socket, remote, use_ephemeral_routing_id); + std::optional socket; try { - socket.connect(addr); + socket.emplace(context, zmq::socket_type::dealer); + setup_outgoing_socket(*socket, remote, use_ephemeral_routing_id); + socket->connect(addr); } catch (const zmq::error_t& e) { // Note that this failure cases indicates something serious went wrong that means zmq isn't // even going to try connecting (for example an unparseable remote address). @@ -175,7 +177,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, p.activity(); connections_updated = true; outgoing_sn_conns.emplace_hint(outgoing_sn_conns.end(), p.conn_id, ConnectionID{remote}); - auto it = connections.emplace_hint(connections.end(), p.conn_id, std::move(socket)); + auto it = connections.emplace_hint(connections.end(), p.conn_id, *std::move(socket)); return {&it->second, ""s}; } @@ -321,10 +323,11 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) { OMQ_LOG(debug, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + oxenc::to_hex(remote_pubkey)); - zmq::socket_t sock{context, zmq::socket_type::dealer}; + std::optional sock; try { - setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid); - sock.connect(remote); + sock.emplace(context, zmq::socket_type::dealer); + setup_outgoing_socket(*sock, remote_pubkey, ephemeral_rid); + sock->connect(remote); } catch (const zmq::error_t &e) { proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { on_failure(conn_id, std::move(what)); @@ -332,7 +335,7 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) { return; } - auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(sock))->second; + auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(*sock))->second; connections_updated = true; OMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI"); send_direct_message(s, "HI"); diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index 915973f..73706bd 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -2,6 +2,7 @@ #include "oxenmq-internal.h" #include "zmq.hpp" #include +#include #include #include #include @@ -236,15 +237,42 @@ void OxenMQ::start() { OMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", oxenc::to_hex(pubkey)); - int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit); - if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit) - context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS); - else - OMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]"); + assert(general_workers > 0); + if (batch_jobs_reserved < 0) + batch_jobs_reserved = (general_workers + 1) / 2; + if (reply_jobs_reserved < 0) + reply_jobs_reserved = (general_workers + 7) / 8; + + max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved; + for (const auto& cat : categories) { + max_workers += cat.second.reserved_threads; + } + + if (log_level() >= LogLevel::debug) { + OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:"); + for (const auto& cat : categories) + OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads); + OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved); + OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved); + OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads"); + } + + if (MAX_SOCKETS != 0) { + // The max sockets setting we apply to the context here is used during zmq context + // initialization, which happens when the first socket is constructed using this context: + // hence we set this *before* constructing any socket_t on the context. + int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit); + int want_sockets = MAX_SOCKETS < 0 ? zmq_socket_limit : + std::min(zmq_socket_limit, + MAX_SOCKETS + max_workers + tagged_workers.size() + + 4 /* zap_auth, workers_socket, command, inproc_listener */); + context.set(zmq::ctxopt::max_sockets, want_sockets); + } // We bind `command` here so that the `get_control_socket()` below is always connecting to a // bound socket, but we do nothing else here: the proxy thread is responsible for everything // except binding it. + command = zmq::socket_t{context, zmq::socket_type::router}; command.bind(SN_ADDR_COMMAND); std::promise startup_prom; auto proxy_startup = startup_prom.get_future(); @@ -399,23 +427,13 @@ OxenMQ::run_info& OxenMQ::run_info::load(batch_job&& bj, bool reply_job, int tag OxenMQ::~OxenMQ() { if (!proxy_thread.joinable()) { if (!tagged_workers.empty()) { - // This is a bit icky: we have tagged workers that are waiting for a signal on - // workers_socket, but the listening end of workers_socket doesn't get set up until the - // proxy thread starts (and we're getting destructed here without a proxy thread). So - // we need to start listening on it here in the destructor so that we establish a - // connection and send the QUITs to the tagged worker threads. - workers_socket.set(zmq::sockopt::router_mandatory, true); - workers_socket.bind(SN_ADDR_WORKERS); - for (auto& [run, busy, queue] : tagged_workers) { - while (true) { - try { - route_control(workers_socket, run.worker_routing_id, "QUIT"); - break; - } catch (const zmq::error_t&) { - std::this_thread::sleep_for(5ms); - } - } + // We have tagged workers that are waiting on a signal for startup, but we didn't start + // up, so signal them so that they can end themselves. + { + std::lock_guard lock{tagged_startup_mutex}; + tagged_go = true; } + tagged_cv.notify_all(); for (auto& [run, busy, queue] : tagged_workers) run.worker_thread.join(); } diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 996cd24..a3e3361 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -28,6 +28,7 @@ #pragma once +#include #include #include #include @@ -237,7 +238,9 @@ public: /** Maximum open sockets, passed to the ZMQ context during start(). The default here is 10k, * designed to be enough to be more than enough to allow a full-mesh SN layer connection if - * necessary for the forseeable future. */ + * necessary for the forseeable future. The actual value passed to ZMQ will be slightly higher, + * to allow for internal inter-thread communication sockets. Set to 0 to explicitly avoid + * setting the value; set to -1 to use the maximum supported by ZMQ. */ int MAX_SOCKETS = 10000; /** Minimum reconnect interval: when a connection fails or dies, wait this long before @@ -332,7 +335,7 @@ private: /// The socket we listen on for handling ZAP authentication requests (the other end is internal /// to zmq which sends requests to us as needed). - zmq::socket_t zap_auth{context, zmq::socket_type::rep}; + zmq::socket_t zap_auth; struct bind_data { std::string address; @@ -436,7 +439,7 @@ private: /// internal "control" connection (returned by `get_control_socket()`) to this socket used to /// give instructions to the proxy such as instructing it to initiate a connection to a remote /// or send a message. - zmq::socket_t command{context, zmq::socket_type::router}; + zmq::socket_t command; /// Timers. TODO: once cppzmq adds an interface around the zmq C timers API then switch to it. struct TimersDeleter { void operator()(void* timers); }; @@ -455,7 +458,7 @@ public: private: /// Router socket to reach internal worker threads from proxy - zmq::socket_t workers_socket{context, zmq::socket_type::router}; + zmq::socket_t workers_socket; /// indices of idle, active workers; note that this vector is usually oversized std::vector idle_workers; @@ -474,7 +477,7 @@ private: int active_workers() const { return workers.size() - idle_worker_count; } /// Worker thread loop. Tagged and start are provided for a tagged worker thread. - void worker_thread(unsigned int index, std::optional tagged = std::nullopt, std::function start = nullptr); + void worker_thread(unsigned int index, std::optional tagged, std::function start); /// If set, skip polling for one proxy loop iteration (set when we know we have something /// processible without having to shove it onto a socket, such as scheduling an internal job). @@ -771,11 +774,23 @@ private: /// change it. std::vector workers; + /// Dealer sockets for workers to use to talk to the proxy thread. These are initialized during + /// start(), and after that belong exclusively to the worker thread with the same index as used + /// in `workers`. + std::vector worker_sockets; + /// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The /// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool /// is whether the worker is currently busy (true) or available (false). std::vector> tagged_workers; + /// Startup signalling for tagged workers; the tagged threads get initialized before startup, + /// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't + /// used after startup is complete. + std::mutex tagged_startup_mutex; + bool tagged_go{false}; + std::condition_variable tagged_cv; + public: /** * OxenMQ constructor. This constructs the object but does not start it; you will typically diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 28275c2..5732fe1 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -377,36 +377,23 @@ void OxenMQ::proxy_loop_init() { pthread_setname_np("omq-proxy"); #endif + zap_auth = zmq::socket_t{context, zmq::socket_type::rep}; zap_auth.set(zmq::sockopt::linger, 0); zap_auth.bind(ZMQ_ADDR_ZAP); + workers_socket = zmq::socket_t{context, zmq::socket_type::router}; workers_socket.set(zmq::sockopt::router_mandatory, true); workers_socket.bind(SN_ADDR_WORKERS); - assert(general_workers > 0); - if (batch_jobs_reserved < 0) - batch_jobs_reserved = (general_workers + 1) / 2; - if (reply_jobs_reserved < 0) - reply_jobs_reserved = (general_workers + 7) / 8; - - max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved; - for (const auto& cat : categories) { - max_workers += cat.second.reserved_threads; - } - - if (log_level() >= LogLevel::debug) { - OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:"); - for (const auto& cat : categories) - OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads); - OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved); - OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved); - OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads"); - } - workers.reserve(max_workers); idle_workers.resize(max_workers); - if (!workers.empty()) + if (!workers.empty() || !worker_sockets.empty()) throw std::logic_error("Internal error: proxy thread started with active worker threads"); + worker_sockets.reserve(max_workers); + // Pre-initialize these worker sockets rather than creating during thread initialization so that + // we can't hit the zmq socket limit during worker thread startup. + for (int i = 0; i < max_workers; i++) + worker_sockets.emplace_back(context, zmq::socket_type::dealer); #ifndef _WIN32 int saved_umask = -1; @@ -466,6 +453,11 @@ void OxenMQ::proxy_loop_init() { // synchronization dance to guarantee that the workers are routable before we can proceed. if (!tagged_workers.empty()) { OMQ_LOG(debug, "Waiting for tagged workers"); + { + std::unique_lock lock{tagged_startup_mutex}; + tagged_go = true; + } + tagged_cv.notify_all(); std::unordered_set waiting_on; for (auto& w : tagged_workers) waiting_on.emplace(std::get(w).worker_routing_id); diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index d10e938..fb54735 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -62,7 +62,22 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged pthread_setname_np(thread_name.c_str()); #endif - zmq::socket_t sock{context, zmq::socket_type::dealer}; + std::optional tagged_socket; + if (tagged) { + // If we're a tagged worker then we got started up before OxenMQ started, so we need to wait + // for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can + // start waiting for commands in the main loop further down. (We also can't get the + // reference to our `tagged_workers` element or create a socket until the main proxy thread + // is running). + { + std::unique_lock lock{tagged_startup_mutex}; + tagged_cv.wait(lock, [this] { return tagged_go; }); + } + if (!proxy_thread.joinable()) // OxenMQ destroyed without starting + return; + tagged_socket.emplace(context, zmq::socket_type::dealer); + } + auto& sock = tagged ? *tagged_socket : worker_sockets[index]; sock.set(zmq::sockopt::routing_id, routing_id); OMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started"); sock.connect(SN_ADDR_WORKERS); @@ -74,11 +89,6 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged bool waiting_for_command; if (tagged) { - // If we're a tagged worker then we got started up before OxenMQ started, so we need to wait - // for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can - // start waiting for commands in the main loop further down. (We also can't get the - // reference to our `tagged_workers` element until the main proxy threads is running). - waiting_for_command = true; if (!worker_wait_for(*this, sock, parts, worker_id, "START"sv)) @@ -268,7 +278,7 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l void OxenMQ::proxy_run_worker(run_info& run) { if (!run.worker_thread.joinable()) - run.worker_thread = std::thread{[this, id=run.worker_id] { worker_thread(id); }}; + run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, std::nullopt, nullptr}; else send_routed_message(workers_socket, run.worker_routing_id, "RUN"); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8586c2a..fadf85a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(tests test_failures.cpp test_inject.cpp test_requests.cpp + test_socket_limit.cpp test_tagged_threads.cpp test_timer.cpp ) diff --git a/tests/test_socket_limit.cpp b/tests/test_socket_limit.cpp new file mode 100644 index 0000000..ed77e9e --- /dev/null +++ b/tests/test_socket_limit.cpp @@ -0,0 +1,43 @@ +#include "common.h" +#include + +using namespace oxenmq; + +TEST_CASE("zmq socket limit", "[zmq][socket-limit]") { + // Make sure setting .MAX_SOCKETS works as expected. (This test was added when a bug was fixed + // that was causing it not to be applied). + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_plain(listen); + server.start(); + + std::atomic failed = 0, good = 0, failed_toomany = 0; + OxenMQ client; + client.MAX_SOCKETS = 15; + client.start(); + + std::vector conns; + address server_addr{listen}; + for (int i = 0; i < 16; i++) + client.connect_remote(server_addr, + [&](auto) { good++; }, + [&](auto cid, auto msg) { + if (msg == "connect() failed: Too many open files") + failed_toomany++; + else + failed++; + }); + + + wait_for([&] { return good > 0 && failed_toomany > 0; }); + { + auto lock = catch_lock(); + REQUIRE( good > 0 ); + REQUIRE( failed == 0 ); + REQUIRE( failed_toomany > 0 ); + } +} From 3a3ffa7d23921ac4a7027b2a8692f934a8d03a6f Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 5 Aug 2022 10:30:01 -0300 Subject: [PATCH 2/2] Increase ulimit on macos The test suite is now running out of file descriptors, because of macos's default tiny limit. --- .drone.jsonnet | 1 + 1 file changed, 1 insertion(+) diff --git a/.drone.jsonnet b/.drone.jsonnet index 522d403..005da65 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -103,6 +103,7 @@ local full_llvm(version) = debian_pipeline( commands: [ 'mkdir build', 'cd build', + 'ulimit -n 1024', // Because macOS has a stupid tiny default ulimit 'cmake .. -G Ninja -DCMAKE_CXX_FLAGS=-fcolor-diagnostics -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER_LAUNCHER=ccache', 'ninja -v', './tests/tests --use-colour yes',