diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index 73706bd..06bfdd2 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -431,7 +431,7 @@ OxenMQ::~OxenMQ() { // up, so signal them so that they can end themselves. { std::lock_guard lock{tagged_startup_mutex}; - tagged_go = true; + tagged_go = tagged_go_mode::SHUTDOWN; } tagged_cv.notify_all(); for (auto& [run, busy, queue] : tagged_workers) diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index a3e3361..fe53627 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -788,7 +788,8 @@ private: /// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't /// used after startup is complete. std::mutex tagged_startup_mutex; - bool tagged_go{false}; + enum class tagged_go_mode { WAIT, GO, SHUTDOWN }; + tagged_go_mode tagged_go = tagged_go_mode::WAIT; std::condition_variable tagged_cv; public: diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 5732fe1..aa0c166 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -455,7 +455,7 @@ void OxenMQ::proxy_loop_init() { OMQ_LOG(debug, "Waiting for tagged workers"); { std::unique_lock lock{tagged_startup_mutex}; - tagged_go = true; + tagged_go = tagged_go_mode::GO; } tagged_cv.notify_all(); std::unordered_set waiting_on; diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index fb54735..b54ad93 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -71,9 +71,9 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged // is running). { std::unique_lock lock{tagged_startup_mutex}; - tagged_cv.wait(lock, [this] { return tagged_go; }); + tagged_cv.wait(lock, [this] { return tagged_go != tagged_go_mode::WAIT; }); } - if (!proxy_thread.joinable()) // OxenMQ destroyed without starting + if (tagged_go == tagged_go_mode::SHUTDOWN) // OxenMQ destroyed without starting return; tagged_socket.emplace(context, zmq::socket_type::dealer); }