From 85d35fa505ce8fe12fa7b89f28e53d00b2ec44b8 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 30 Nov 2021 13:57:53 -0400 Subject: [PATCH] Propagate proxy thread startup exceptions Currently if the proxy thread fails to start (typically because a bind fails) the exception happens in the proxy thread which is uncatchable by the caller (and aborts the program). This makes it nicer by transporting startup exceptions back to the start() call. --- oxenmq/oxenmq.cpp | 8 +++++++- oxenmq/oxenmq.h | 12 ++++++++++-- oxenmq/proxy.cpp | 29 +++++++++++++++++++---------- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index e53f499..a3539c1 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -5,6 +5,7 @@ #include #include #include +#include extern "C" { #include @@ -244,7 +245,12 @@ void OxenMQ::start() { // bound socket, but we do nothing else here: the proxy thread is responsible for everything // except binding it. command.bind(SN_ADDR_COMMAND); - proxy_thread = std::thread{&OxenMQ::proxy_loop, this}; + std::promise startup_prom; + auto proxy_startup = startup_prom.get_future(); + proxy_thread = std::thread{&OxenMQ::proxy_loop, this, std::move(startup_prom)}; + + OMQ_LOG(debug, "Waiting for proxy thread to initialize..."); + proxy_startup.get(); // Rethrows exceptions from the proxy startup (e.g. failure to bind) OMQ_LOG(debug, "Waiting for proxy thread to get ready..."); auto &control = get_control_socket(); diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index b83fcca..e870e99 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -56,6 +56,10 @@ #error "ZMQ >= 4.3.0 required" #endif +namespace std { +template class promise; +} + namespace oxenmq { using namespace std::literals; @@ -467,8 +471,9 @@ private: /// processible without having to shove it onto a socket, such as scheduling an internal job). bool proxy_skip_one_poll = false; - /// Does the proxying work - void proxy_loop(); + /// Does the proxying work. Signals startup success (or failure) via the promise. + void proxy_loop(std::promise); + void proxy_loop_init(); void proxy_conn_cleanup(); @@ -956,6 +961,9 @@ public: * Finish starting up: binds to the bind locations given in the constructor and launches the * proxy thread to handle message dispatching between remote nodes and worker threads. * + * Raises an exception if the proxy thread cannot be successfully started, such as if a bind + * error occurs. + * * Things you want to do before calling this: * - Use `add_category`/`add_command` to set up any commands remote connections can invoke. * - If any commands require SN authentication, specify a list of currently active service node diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 8e5f7b5..5b8c73b 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -1,6 +1,8 @@ #include "oxenmq.h" #include "oxenmq-internal.h" #include "hex.h" +#include +#include #if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) extern "C" { @@ -365,7 +367,7 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) { return true; } -void OxenMQ::proxy_loop() { +void OxenMQ::proxy_loop_init() { #if defined(__linux__) || defined(__sun) || defined(__MINGW32__) pthread_setname_np(pthread_self(), "omq-proxy"); @@ -420,7 +422,7 @@ void OxenMQ::proxy_loop() { for (size_t i = 0; i < bind.size(); i++) { if (!proxy_bind(bind[i], i)) { - OMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address); + OMQ_LOG(fatal, "OxenMQ failed to listen on ", bind[i].address); throw zmq::error_t{}; } } @@ -451,18 +453,13 @@ void OxenMQ::proxy_loop() { if (!timers) timers.reset(zmq_timers_new()); - auto do_conn_cleanup = [this] { proxy_conn_cleanup(); }; - using CleanupLambda = decltype(do_conn_cleanup); if (-1 == zmq_timers_add(timers.get(), std::chrono::milliseconds{CONN_CHECK_INTERVAL}.count(), - // Wrap our lambda into a C function pointer where we pass in the lambda pointer as extra arg - [](int /*timer_id*/, void* cleanup) { (*static_cast(cleanup))(); }, - &do_conn_cleanup)) { + [](int /*timer_id*/, void* self) { static_cast(self)->proxy_conn_cleanup(); }, + this)) { throw zmq::error_t{}; } - std::vector parts; - // Wait for tagged worker threads to get ready and connect to us (we get a "STARTING" message) // and send them back a "START" to let them know to go ahead with startup. We need this // synchronization dance to guarantee that the workers are routable before we can proceed. @@ -471,7 +468,7 @@ void OxenMQ::proxy_loop() { std::unordered_set waiting_on; for (auto& w : tagged_workers) waiting_on.emplace(std::get(w).worker_routing_id); - for (; !waiting_on.empty(); parts.clear()) { + for (std::vector parts; !waiting_on.empty(); parts.clear()) { recv_message_parts(workers_socket, parts); if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) { OMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup"); @@ -489,6 +486,18 @@ void OxenMQ::proxy_loop() { route_control(workers_socket, std::get(w).worker_routing_id, "START"); } } +} + +void OxenMQ::proxy_loop(std::promise startup) { + try { + proxy_loop_init(); + } catch (...) { + startup.set_exception(std::current_exception()); + return; + } + startup.set_value(); + + std::vector parts; while (true) { std::chrono::milliseconds poll_timeout;