diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index e53f499..a3539c1 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -5,6 +5,7 @@ #include #include #include +#include extern "C" { #include @@ -244,7 +245,12 @@ void OxenMQ::start() { // bound socket, but we do nothing else here: the proxy thread is responsible for everything // except binding it. command.bind(SN_ADDR_COMMAND); - proxy_thread = std::thread{&OxenMQ::proxy_loop, this}; + std::promise startup_prom; + auto proxy_startup = startup_prom.get_future(); + proxy_thread = std::thread{&OxenMQ::proxy_loop, this, std::move(startup_prom)}; + + OMQ_LOG(debug, "Waiting for proxy thread to initialize..."); + proxy_startup.get(); // Rethrows exceptions from the proxy startup (e.g. failure to bind) OMQ_LOG(debug, "Waiting for proxy thread to get ready..."); auto &control = get_control_socket(); diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index b83fcca..e870e99 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -56,6 +56,10 @@ #error "ZMQ >= 4.3.0 required" #endif +namespace std { +template class promise; +} + namespace oxenmq { using namespace std::literals; @@ -467,8 +471,9 @@ private: /// processible without having to shove it onto a socket, such as scheduling an internal job). bool proxy_skip_one_poll = false; - /// Does the proxying work - void proxy_loop(); + /// Does the proxying work. Signals startup success (or failure) via the promise. + void proxy_loop(std::promise); + void proxy_loop_init(); void proxy_conn_cleanup(); @@ -956,6 +961,9 @@ public: * Finish starting up: binds to the bind locations given in the constructor and launches the * proxy thread to handle message dispatching between remote nodes and worker threads. * + * Raises an exception if the proxy thread cannot be successfully started, such as if a bind + * error occurs. + * * Things you want to do before calling this: * - Use `add_category`/`add_command` to set up any commands remote connections can invoke. * - If any commands require SN authentication, specify a list of currently active service node diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 8e5f7b5..5b8c73b 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -1,6 +1,8 @@ #include "oxenmq.h" #include "oxenmq-internal.h" #include "hex.h" +#include +#include #if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) extern "C" { @@ -365,7 +367,7 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) { return true; } -void OxenMQ::proxy_loop() { +void OxenMQ::proxy_loop_init() { #if defined(__linux__) || defined(__sun) || defined(__MINGW32__) pthread_setname_np(pthread_self(), "omq-proxy"); @@ -420,7 +422,7 @@ void OxenMQ::proxy_loop() { for (size_t i = 0; i < bind.size(); i++) { if (!proxy_bind(bind[i], i)) { - OMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address); + OMQ_LOG(fatal, "OxenMQ failed to listen on ", bind[i].address); throw zmq::error_t{}; } } @@ -451,18 +453,13 @@ void OxenMQ::proxy_loop() { if (!timers) timers.reset(zmq_timers_new()); - auto do_conn_cleanup = [this] { proxy_conn_cleanup(); }; - using CleanupLambda = decltype(do_conn_cleanup); if (-1 == zmq_timers_add(timers.get(), std::chrono::milliseconds{CONN_CHECK_INTERVAL}.count(), - // Wrap our lambda into a C function pointer where we pass in the lambda pointer as extra arg - [](int /*timer_id*/, void* cleanup) { (*static_cast(cleanup))(); }, - &do_conn_cleanup)) { + [](int /*timer_id*/, void* self) { static_cast(self)->proxy_conn_cleanup(); }, + this)) { throw zmq::error_t{}; } - std::vector parts; - // Wait for tagged worker threads to get ready and connect to us (we get a "STARTING" message) // and send them back a "START" to let them know to go ahead with startup. We need this // synchronization dance to guarantee that the workers are routable before we can proceed. @@ -471,7 +468,7 @@ void OxenMQ::proxy_loop() { std::unordered_set waiting_on; for (auto& w : tagged_workers) waiting_on.emplace(std::get(w).worker_routing_id); - for (; !waiting_on.empty(); parts.clear()) { + for (std::vector parts; !waiting_on.empty(); parts.clear()) { recv_message_parts(workers_socket, parts); if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) { OMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup"); @@ -489,6 +486,18 @@ void OxenMQ::proxy_loop() { route_control(workers_socket, std::get(w).worker_routing_id, "START"); } } +} + +void OxenMQ::proxy_loop(std::promise startup) { + try { + proxy_loop_init(); + } catch (...) { + startup.set_exception(std::current_exception()); + return; + } + startup.set_value(); + + std::vector parts; while (true) { std::chrono::milliseconds poll_timeout;