diff --git a/.drone.jsonnet b/.drone.jsonnet index 522d403..005da65 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -103,6 +103,7 @@ local full_llvm(version) = debian_pipeline( commands: [ 'mkdir build', 'cd build', + 'ulimit -n 1024', // Because macOS has a stupid tiny default ulimit 'cmake .. -G Ninja -DCMAKE_CXX_FLAGS=-fcolor-diagnostics -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER_LAUNCHER=ccache', 'ninja -v', './tests/tests --use-colour yes', diff --git a/CMakeLists.txt b/CMakeLists.txt index 69a3e7b..269d92d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.12 + VERSION 1.2.13 LANGUAGES CXX C) include(GNUInstallDirs) diff --git a/oxenmq/connections.cpp b/oxenmq/connections.cpp index 79812a5..c5ecd70 100644 --- a/oxenmq/connections.cpp +++ b/oxenmq/connections.cpp @@ -1,6 +1,7 @@ #include "oxenmq.h" #include "oxenmq-internal.h" #include +#include namespace oxenmq { @@ -156,10 +157,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, } OMQ_LOG(debug, oxenc::to_hex(pubkey), " (me) connecting to ", addr, " to reach ", oxenc::to_hex(remote)); - zmq::socket_t socket{context, zmq::socket_type::dealer}; - setup_outgoing_socket(socket, remote, use_ephemeral_routing_id); + std::optional socket; try { - socket.connect(addr); + socket.emplace(context, zmq::socket_type::dealer); + setup_outgoing_socket(*socket, remote, use_ephemeral_routing_id); + socket->connect(addr); } catch (const zmq::error_t& e) { // Note that this failure cases indicates something serious went wrong that means zmq isn't // even going to try connecting (for example an unparseable remote address). @@ -175,7 +177,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, p.activity(); connections_updated = true; outgoing_sn_conns.emplace_hint(outgoing_sn_conns.end(), p.conn_id, ConnectionID{remote}); - auto it = connections.emplace_hint(connections.end(), p.conn_id, std::move(socket)); + auto it = connections.emplace_hint(connections.end(), p.conn_id, *std::move(socket)); return {&it->second, ""s}; } @@ -321,10 +323,11 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) { OMQ_LOG(debug, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + oxenc::to_hex(remote_pubkey)); - zmq::socket_t sock{context, zmq::socket_type::dealer}; + std::optional sock; try { - setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid); - sock.connect(remote); + sock.emplace(context, zmq::socket_type::dealer); + setup_outgoing_socket(*sock, remote_pubkey, ephemeral_rid); + sock->connect(remote); } catch (const zmq::error_t &e) { proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { on_failure(conn_id, std::move(what)); @@ -332,7 +335,7 @@ void OxenMQ::proxy_connect_remote(oxenc::bt_dict_consumer data) { return; } - auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(sock))->second; + auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(*sock))->second; connections_updated = true; OMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI"); send_direct_message(s, "HI"); diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index 915973f..73706bd 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -2,6 +2,7 @@ #include "oxenmq-internal.h" #include "zmq.hpp" #include +#include #include #include #include @@ -236,15 +237,42 @@ void OxenMQ::start() { OMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", oxenc::to_hex(pubkey)); - int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit); - if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit) - context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS); - else - OMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]"); + assert(general_workers > 0); + if (batch_jobs_reserved < 0) + batch_jobs_reserved = (general_workers + 1) / 2; + if (reply_jobs_reserved < 0) + reply_jobs_reserved = (general_workers + 7) / 8; + + max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved; + for (const auto& cat : categories) { + max_workers += cat.second.reserved_threads; + } + + if (log_level() >= LogLevel::debug) { + OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:"); + for (const auto& cat : categories) + OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads); + OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved); + OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved); + OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads"); + } + + if (MAX_SOCKETS != 0) { + // The max sockets setting we apply to the context here is used during zmq context + // initialization, which happens when the first socket is constructed using this context: + // hence we set this *before* constructing any socket_t on the context. + int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit); + int want_sockets = MAX_SOCKETS < 0 ? zmq_socket_limit : + std::min(zmq_socket_limit, + MAX_SOCKETS + max_workers + tagged_workers.size() + + 4 /* zap_auth, workers_socket, command, inproc_listener */); + context.set(zmq::ctxopt::max_sockets, want_sockets); + } // We bind `command` here so that the `get_control_socket()` below is always connecting to a // bound socket, but we do nothing else here: the proxy thread is responsible for everything // except binding it. + command = zmq::socket_t{context, zmq::socket_type::router}; command.bind(SN_ADDR_COMMAND); std::promise startup_prom; auto proxy_startup = startup_prom.get_future(); @@ -399,23 +427,13 @@ OxenMQ::run_info& OxenMQ::run_info::load(batch_job&& bj, bool reply_job, int tag OxenMQ::~OxenMQ() { if (!proxy_thread.joinable()) { if (!tagged_workers.empty()) { - // This is a bit icky: we have tagged workers that are waiting for a signal on - // workers_socket, but the listening end of workers_socket doesn't get set up until the - // proxy thread starts (and we're getting destructed here without a proxy thread). So - // we need to start listening on it here in the destructor so that we establish a - // connection and send the QUITs to the tagged worker threads. - workers_socket.set(zmq::sockopt::router_mandatory, true); - workers_socket.bind(SN_ADDR_WORKERS); - for (auto& [run, busy, queue] : tagged_workers) { - while (true) { - try { - route_control(workers_socket, run.worker_routing_id, "QUIT"); - break; - } catch (const zmq::error_t&) { - std::this_thread::sleep_for(5ms); - } - } + // We have tagged workers that are waiting on a signal for startup, but we didn't start + // up, so signal them so that they can end themselves. + { + std::lock_guard lock{tagged_startup_mutex}; + tagged_go = true; } + tagged_cv.notify_all(); for (auto& [run, busy, queue] : tagged_workers) run.worker_thread.join(); } diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 996cd24..a3e3361 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -28,6 +28,7 @@ #pragma once +#include #include #include #include @@ -237,7 +238,9 @@ public: /** Maximum open sockets, passed to the ZMQ context during start(). The default here is 10k, * designed to be enough to be more than enough to allow a full-mesh SN layer connection if - * necessary for the forseeable future. */ + * necessary for the forseeable future. The actual value passed to ZMQ will be slightly higher, + * to allow for internal inter-thread communication sockets. Set to 0 to explicitly avoid + * setting the value; set to -1 to use the maximum supported by ZMQ. */ int MAX_SOCKETS = 10000; /** Minimum reconnect interval: when a connection fails or dies, wait this long before @@ -332,7 +335,7 @@ private: /// The socket we listen on for handling ZAP authentication requests (the other end is internal /// to zmq which sends requests to us as needed). - zmq::socket_t zap_auth{context, zmq::socket_type::rep}; + zmq::socket_t zap_auth; struct bind_data { std::string address; @@ -436,7 +439,7 @@ private: /// internal "control" connection (returned by `get_control_socket()`) to this socket used to /// give instructions to the proxy such as instructing it to initiate a connection to a remote /// or send a message. - zmq::socket_t command{context, zmq::socket_type::router}; + zmq::socket_t command; /// Timers. TODO: once cppzmq adds an interface around the zmq C timers API then switch to it. struct TimersDeleter { void operator()(void* timers); }; @@ -455,7 +458,7 @@ public: private: /// Router socket to reach internal worker threads from proxy - zmq::socket_t workers_socket{context, zmq::socket_type::router}; + zmq::socket_t workers_socket; /// indices of idle, active workers; note that this vector is usually oversized std::vector idle_workers; @@ -474,7 +477,7 @@ private: int active_workers() const { return workers.size() - idle_worker_count; } /// Worker thread loop. Tagged and start are provided for a tagged worker thread. - void worker_thread(unsigned int index, std::optional tagged = std::nullopt, std::function start = nullptr); + void worker_thread(unsigned int index, std::optional tagged, std::function start); /// If set, skip polling for one proxy loop iteration (set when we know we have something /// processible without having to shove it onto a socket, such as scheduling an internal job). @@ -771,11 +774,23 @@ private: /// change it. std::vector workers; + /// Dealer sockets for workers to use to talk to the proxy thread. These are initialized during + /// start(), and after that belong exclusively to the worker thread with the same index as used + /// in `workers`. + std::vector worker_sockets; + /// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The /// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool /// is whether the worker is currently busy (true) or available (false). std::vector> tagged_workers; + /// Startup signalling for tagged workers; the tagged threads get initialized before startup, + /// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't + /// used after startup is complete. + std::mutex tagged_startup_mutex; + bool tagged_go{false}; + std::condition_variable tagged_cv; + public: /** * OxenMQ constructor. This constructs the object but does not start it; you will typically diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 28275c2..5732fe1 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -377,36 +377,23 @@ void OxenMQ::proxy_loop_init() { pthread_setname_np("omq-proxy"); #endif + zap_auth = zmq::socket_t{context, zmq::socket_type::rep}; zap_auth.set(zmq::sockopt::linger, 0); zap_auth.bind(ZMQ_ADDR_ZAP); + workers_socket = zmq::socket_t{context, zmq::socket_type::router}; workers_socket.set(zmq::sockopt::router_mandatory, true); workers_socket.bind(SN_ADDR_WORKERS); - assert(general_workers > 0); - if (batch_jobs_reserved < 0) - batch_jobs_reserved = (general_workers + 1) / 2; - if (reply_jobs_reserved < 0) - reply_jobs_reserved = (general_workers + 7) / 8; - - max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved; - for (const auto& cat : categories) { - max_workers += cat.second.reserved_threads; - } - - if (log_level() >= LogLevel::debug) { - OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:"); - for (const auto& cat : categories) - OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads); - OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved); - OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved); - OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads"); - } - workers.reserve(max_workers); idle_workers.resize(max_workers); - if (!workers.empty()) + if (!workers.empty() || !worker_sockets.empty()) throw std::logic_error("Internal error: proxy thread started with active worker threads"); + worker_sockets.reserve(max_workers); + // Pre-initialize these worker sockets rather than creating during thread initialization so that + // we can't hit the zmq socket limit during worker thread startup. + for (int i = 0; i < max_workers; i++) + worker_sockets.emplace_back(context, zmq::socket_type::dealer); #ifndef _WIN32 int saved_umask = -1; @@ -466,6 +453,11 @@ void OxenMQ::proxy_loop_init() { // synchronization dance to guarantee that the workers are routable before we can proceed. if (!tagged_workers.empty()) { OMQ_LOG(debug, "Waiting for tagged workers"); + { + std::unique_lock lock{tagged_startup_mutex}; + tagged_go = true; + } + tagged_cv.notify_all(); std::unordered_set waiting_on; for (auto& w : tagged_workers) waiting_on.emplace(std::get(w).worker_routing_id); diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index d10e938..fb54735 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -62,7 +62,22 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged pthread_setname_np(thread_name.c_str()); #endif - zmq::socket_t sock{context, zmq::socket_type::dealer}; + std::optional tagged_socket; + if (tagged) { + // If we're a tagged worker then we got started up before OxenMQ started, so we need to wait + // for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can + // start waiting for commands in the main loop further down. (We also can't get the + // reference to our `tagged_workers` element or create a socket until the main proxy thread + // is running). + { + std::unique_lock lock{tagged_startup_mutex}; + tagged_cv.wait(lock, [this] { return tagged_go; }); + } + if (!proxy_thread.joinable()) // OxenMQ destroyed without starting + return; + tagged_socket.emplace(context, zmq::socket_type::dealer); + } + auto& sock = tagged ? *tagged_socket : worker_sockets[index]; sock.set(zmq::sockopt::routing_id, routing_id); OMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started"); sock.connect(SN_ADDR_WORKERS); @@ -74,11 +89,6 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged bool waiting_for_command; if (tagged) { - // If we're a tagged worker then we got started up before OxenMQ started, so we need to wait - // for an all-clear signal from OxenMQ first, then we fire our `start` callback, then we can - // start waiting for commands in the main loop further down. (We also can't get the - // reference to our `tagged_workers` element until the main proxy threads is running). - waiting_for_command = true; if (!worker_wait_for(*this, sock, parts, worker_id, "START"sv)) @@ -268,7 +278,7 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l void OxenMQ::proxy_run_worker(run_info& run) { if (!run.worker_thread.joinable()) - run.worker_thread = std::thread{[this, id=run.worker_id] { worker_thread(id); }}; + run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, std::nullopt, nullptr}; else send_routed_message(workers_socket, run.worker_routing_id, "RUN"); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8586c2a..fadf85a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(tests test_failures.cpp test_inject.cpp test_requests.cpp + test_socket_limit.cpp test_tagged_threads.cpp test_timer.cpp ) diff --git a/tests/test_socket_limit.cpp b/tests/test_socket_limit.cpp new file mode 100644 index 0000000..ed77e9e --- /dev/null +++ b/tests/test_socket_limit.cpp @@ -0,0 +1,43 @@ +#include "common.h" +#include + +using namespace oxenmq; + +TEST_CASE("zmq socket limit", "[zmq][socket-limit]") { + // Make sure setting .MAX_SOCKETS works as expected. (This test was added when a bug was fixed + // that was causing it not to be applied). + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_plain(listen); + server.start(); + + std::atomic failed = 0, good = 0, failed_toomany = 0; + OxenMQ client; + client.MAX_SOCKETS = 15; + client.start(); + + std::vector conns; + address server_addr{listen}; + for (int i = 0; i < 16; i++) + client.connect_remote(server_addr, + [&](auto) { good++; }, + [&](auto cid, auto msg) { + if (msg == "connect() failed: Too many open files") + failed_toomany++; + else + failed++; + }); + + + wait_for([&] { return good > 0 && failed_toomany > 0; }); + { + auto lock = catch_lock(); + REQUIRE( good > 0 ); + REQUIRE( failed == 0 ); + REQUIRE( failed_toomany > 0 ); + } +}