From 445f214840c2ba2fe4fc89f9f4592af23a3f0f91 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 5 Oct 2022 18:01:40 -0300 Subject: [PATCH] Fix a race condition with tagged thread startup There's a very rare race condition where a tagged thread doesn't seem to exist when the proxy tries syncing startup with them, and so the proxy thread hangs in startup. This addresses it by avoiding looking at the `proxy_thread` variable (which probably isn't thread safe) in the worker's startup, and signalling the you-need-to-shutdown condition via a third option for the (formerly boolean) `tagged_go`. --- oxenmq/oxenmq.cpp | 2 +- oxenmq/oxenmq.h | 3 ++- oxenmq/proxy.cpp | 2 +- oxenmq/worker.cpp | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index 73706bd..06bfdd2 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -431,7 +431,7 @@ OxenMQ::~OxenMQ() { // up, so signal them so that they can end themselves. { std::lock_guard lock{tagged_startup_mutex}; - tagged_go = true; + tagged_go = tagged_go_mode::SHUTDOWN; } tagged_cv.notify_all(); for (auto& [run, busy, queue] : tagged_workers) diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index a3e3361..fe53627 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -788,7 +788,8 @@ private: /// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't /// used after startup is complete. std::mutex tagged_startup_mutex; - bool tagged_go{false}; + enum class tagged_go_mode { WAIT, GO, SHUTDOWN }; + tagged_go_mode tagged_go = tagged_go_mode::WAIT; std::condition_variable tagged_cv; public: diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 5732fe1..aa0c166 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -455,7 +455,7 @@ void OxenMQ::proxy_loop_init() { OMQ_LOG(debug, "Waiting for tagged workers"); { std::unique_lock lock{tagged_startup_mutex}; - tagged_go = true; + tagged_go = tagged_go_mode::GO; } tagged_cv.notify_all(); std::unordered_set waiting_on; diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index fb54735..b54ad93 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -71,9 +71,9 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged // is running). { std::unique_lock lock{tagged_startup_mutex}; - tagged_cv.wait(lock, [this] { return tagged_go; }); + tagged_cv.wait(lock, [this] { return tagged_go != tagged_go_mode::WAIT; }); } - if (!proxy_thread.joinable()) // OxenMQ destroyed without starting + if (tagged_go == tagged_go_mode::SHUTDOWN) // OxenMQ destroyed without starting return; tagged_socket.emplace(context, zmq::socket_type::dealer); }