Fix a race condition with tagged thread startup

There's a very rare race condition where a tagged thread doesn't seem to
exist when the proxy tries syncing startup with them, and so the proxy
thread hangs in startup.

This addresses it by avoiding looking at the `proxy_thread` variable
(which probably isn't thread safe) in the worker's startup, and
signalling the you-need-to-shutdown condition via a third option for the
(formerly boolean) `tagged_go`.
This commit is contained in:
Jason Rhinelander 2022-10-05 18:01:40 -03:00
parent 358005df06
commit 445f214840
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
4 changed files with 6 additions and 5 deletions

View File

@ -431,7 +431,7 @@ OxenMQ::~OxenMQ() {
// up, so signal them so that they can end themselves.
{
std::lock_guard lock{tagged_startup_mutex};
tagged_go = true;
tagged_go = tagged_go_mode::SHUTDOWN;
}
tagged_cv.notify_all();
for (auto& [run, busy, queue] : tagged_workers)

View File

@ -788,7 +788,8 @@ private:
/// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't
/// used after startup is complete.
std::mutex tagged_startup_mutex;
bool tagged_go{false};
enum class tagged_go_mode { WAIT, GO, SHUTDOWN };
tagged_go_mode tagged_go = tagged_go_mode::WAIT;
std::condition_variable tagged_cv;
public:

View File

@ -455,7 +455,7 @@ void OxenMQ::proxy_loop_init() {
OMQ_LOG(debug, "Waiting for tagged workers");
{
std::unique_lock lock{tagged_startup_mutex};
tagged_go = true;
tagged_go = tagged_go_mode::GO;
}
tagged_cv.notify_all();
std::unordered_set<std::string_view> waiting_on;

View File

@ -71,9 +71,9 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
// is running).
{
std::unique_lock lock{tagged_startup_mutex};
tagged_cv.wait(lock, [this] { return tagged_go; });
tagged_cv.wait(lock, [this] { return tagged_go != tagged_go_mode::WAIT; });
}
if (!proxy_thread.joinable()) // OxenMQ destroyed without starting
if (tagged_go == tagged_go_mode::SHUTDOWN) // OxenMQ destroyed without starting
return;
tagged_socket.emplace(context, zmq::socket_type::dealer);
}