mirror of https://github.com/oxen-io/oxen-mq.git
Merge pull request #82 from oxen-io/fix-race-condition
Attempt to fix a race condition
This commit is contained in:
commit
2e308d4f43
|
@ -431,7 +431,7 @@ OxenMQ::~OxenMQ() {
|
||||||
// up, so signal them so that they can end themselves.
|
// up, so signal them so that they can end themselves.
|
||||||
{
|
{
|
||||||
std::lock_guard lock{tagged_startup_mutex};
|
std::lock_guard lock{tagged_startup_mutex};
|
||||||
tagged_go = true;
|
tagged_go = tagged_go_mode::SHUTDOWN;
|
||||||
}
|
}
|
||||||
tagged_cv.notify_all();
|
tagged_cv.notify_all();
|
||||||
for (auto& [run, busy, queue] : tagged_workers)
|
for (auto& [run, busy, queue] : tagged_workers)
|
||||||
|
|
|
@ -788,7 +788,8 @@ private:
|
||||||
/// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't
|
/// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't
|
||||||
/// used after startup is complete.
|
/// used after startup is complete.
|
||||||
std::mutex tagged_startup_mutex;
|
std::mutex tagged_startup_mutex;
|
||||||
bool tagged_go{false};
|
enum class tagged_go_mode { WAIT, GO, SHUTDOWN };
|
||||||
|
tagged_go_mode tagged_go = tagged_go_mode::WAIT;
|
||||||
std::condition_variable tagged_cv;
|
std::condition_variable tagged_cv;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -455,7 +455,7 @@ void OxenMQ::proxy_loop_init() {
|
||||||
OMQ_LOG(debug, "Waiting for tagged workers");
|
OMQ_LOG(debug, "Waiting for tagged workers");
|
||||||
{
|
{
|
||||||
std::unique_lock lock{tagged_startup_mutex};
|
std::unique_lock lock{tagged_startup_mutex};
|
||||||
tagged_go = true;
|
tagged_go = tagged_go_mode::GO;
|
||||||
}
|
}
|
||||||
tagged_cv.notify_all();
|
tagged_cv.notify_all();
|
||||||
std::unordered_set<std::string_view> waiting_on;
|
std::unordered_set<std::string_view> waiting_on;
|
||||||
|
|
|
@ -71,9 +71,9 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
||||||
// is running).
|
// is running).
|
||||||
{
|
{
|
||||||
std::unique_lock lock{tagged_startup_mutex};
|
std::unique_lock lock{tagged_startup_mutex};
|
||||||
tagged_cv.wait(lock, [this] { return tagged_go; });
|
tagged_cv.wait(lock, [this] { return tagged_go != tagged_go_mode::WAIT; });
|
||||||
}
|
}
|
||||||
if (!proxy_thread.joinable()) // OxenMQ destroyed without starting
|
if (tagged_go == tagged_go_mode::SHUTDOWN) // OxenMQ destroyed without starting
|
||||||
return;
|
return;
|
||||||
tagged_socket.emplace(context, zmq::socket_type::dealer);
|
tagged_socket.emplace(context, zmq::socket_type::dealer);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue