Propagate proxy thread startup exceptions

Currently if the proxy thread fails to start (typically because a bind
fails) the exception happens in the proxy thread which is uncatchable by
the caller (and aborts the program).

This makes it nicer by transporting startup exceptions back to the
start() call.
This commit is contained in:
Jason Rhinelander 2021-11-30 13:57:53 -04:00
parent e382373f2e
commit 85d35fa505
3 changed files with 36 additions and 13 deletions

View File

@ -5,6 +5,7 @@
#include <random>
#include <ostream>
#include <thread>
#include <future>
extern "C" {
#include <sodium/core.h>
@ -244,7 +245,12 @@ void OxenMQ::start() {
// bound socket, but we do nothing else here: the proxy thread is responsible for everything
// except binding it.
command.bind(SN_ADDR_COMMAND);
proxy_thread = std::thread{&OxenMQ::proxy_loop, this};
std::promise<void> startup_prom;
auto proxy_startup = startup_prom.get_future();
proxy_thread = std::thread{&OxenMQ::proxy_loop, this, std::move(startup_prom)};
OMQ_LOG(debug, "Waiting for proxy thread to initialize...");
proxy_startup.get(); // Rethrows exceptions from the proxy startup (e.g. failure to bind)
OMQ_LOG(debug, "Waiting for proxy thread to get ready...");
auto &control = get_control_socket();

View File

@ -56,6 +56,10 @@
#error "ZMQ >= 4.3.0 required"
#endif
namespace std {
template <class R> class promise;
}
namespace oxenmq {
using namespace std::literals;
@ -467,8 +471,9 @@ private:
/// processible without having to shove it onto a socket, such as scheduling an internal job).
bool proxy_skip_one_poll = false;
/// Does the proxying work
void proxy_loop();
/// Does the proxying work. Signals startup success (or failure) via the promise.
void proxy_loop(std::promise<void>);
void proxy_loop_init();
void proxy_conn_cleanup();
@ -956,6 +961,9 @@ public:
* Finish starting up: binds to the bind locations given in the constructor and launches the
* proxy thread to handle message dispatching between remote nodes and worker threads.
*
* Raises an exception if the proxy thread cannot be successfully started, such as if a bind
* error occurs.
*
* Things you want to do before calling this:
* - Use `add_category`/`add_command` to set up any commands remote connections can invoke.
* - If any commands require SN authentication, specify a list of currently active service node

View File

@ -1,6 +1,8 @@
#include "oxenmq.h"
#include "oxenmq-internal.h"
#include "hex.h"
#include <exception>
#include <future>
#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
extern "C" {
@ -365,7 +367,7 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
return true;
}
void OxenMQ::proxy_loop() {
void OxenMQ::proxy_loop_init() {
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
pthread_setname_np(pthread_self(), "omq-proxy");
@ -420,7 +422,7 @@ void OxenMQ::proxy_loop() {
for (size_t i = 0; i < bind.size(); i++) {
if (!proxy_bind(bind[i], i)) {
OMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address);
OMQ_LOG(fatal, "OxenMQ failed to listen on ", bind[i].address);
throw zmq::error_t{};
}
}
@ -451,18 +453,13 @@ void OxenMQ::proxy_loop() {
if (!timers)
timers.reset(zmq_timers_new());
auto do_conn_cleanup = [this] { proxy_conn_cleanup(); };
using CleanupLambda = decltype(do_conn_cleanup);
if (-1 == zmq_timers_add(timers.get(),
std::chrono::milliseconds{CONN_CHECK_INTERVAL}.count(),
// Wrap our lambda into a C function pointer where we pass in the lambda pointer as extra arg
[](int /*timer_id*/, void* cleanup) { (*static_cast<CleanupLambda*>(cleanup))(); },
&do_conn_cleanup)) {
[](int /*timer_id*/, void* self) { static_cast<OxenMQ*>(self)->proxy_conn_cleanup(); },
this)) {
throw zmq::error_t{};
}
std::vector<zmq::message_t> parts;
// Wait for tagged worker threads to get ready and connect to us (we get a "STARTING" message)
// and send them back a "START" to let them know to go ahead with startup. We need this
// synchronization dance to guarantee that the workers are routable before we can proceed.
@ -471,7 +468,7 @@ void OxenMQ::proxy_loop() {
std::unordered_set<std::string_view> waiting_on;
for (auto& w : tagged_workers)
waiting_on.emplace(std::get<run_info>(w).worker_routing_id);
for (; !waiting_on.empty(); parts.clear()) {
for (std::vector<zmq::message_t> parts; !waiting_on.empty(); parts.clear()) {
recv_message_parts(workers_socket, parts);
if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) {
OMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup");
@ -489,6 +486,18 @@ void OxenMQ::proxy_loop() {
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
}
}
}
void OxenMQ::proxy_loop(std::promise<void> startup) {
try {
proxy_loop_init();
} catch (...) {
startup.set_exception(std::current_exception());
return;
}
startup.set_value();
std::vector<zmq::message_t> parts;
while (true) {
std::chrono::milliseconds poll_timeout;