mirror of https://github.com/oxen-io/oxen-mq.git
Drop tagged thread init function; add synchronization dance
The init function doesn't seem all that useful and makes the interface a bit more complicated, so drop it. Also addresses a race condition that can happen with tagged thread startup when the proxy tries to talk to a tagged thread but the tagged thread hasn't connected yet (which then aborts the proxy because it assumes workers are always routable).
This commit is contained in:
parent
8caab97355
commit
ae8dd27cdd
|
@ -146,10 +146,7 @@ TaggedThreadID LokiMQ::add_tagged_thread(std::string name, std::function<void()>
|
|||
run.worker_routing_id = "t" + std::to_string(run.worker_id);
|
||||
LMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
|
||||
|
||||
run.worker_thread = std::thread{[this, id=run.worker_id, name, init=std::move(init), start=std::move(start)] {
|
||||
if (init) init();
|
||||
return worker_thread(id, name, std::move(start));
|
||||
}};
|
||||
run.worker_thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id, name, std::move(start)};
|
||||
|
||||
return TaggedThreadID{static_cast<int>(run.worker_id)};
|
||||
}
|
||||
|
|
|
@ -838,7 +838,7 @@ public:
|
|||
* \returns a TaggedThreadID object that can be passed to job(), batch(), or add_timer() to
|
||||
* direct the task to the tagged thread.
|
||||
*/
|
||||
TaggedThreadID add_tagged_thread(std::string name, std::function<void()> init = nullptr, std::function<void()> start = nullptr);
|
||||
TaggedThreadID add_tagged_thread(std::string name, std::function<void()> start = nullptr);
|
||||
|
||||
/**
|
||||
* Sets the number of worker threads reserved for batch jobs. If not explicitly called then
|
||||
|
|
|
@ -397,14 +397,35 @@ void LokiMQ::proxy_loop() {
|
|||
throw zmq::error_t{};
|
||||
}
|
||||
|
||||
// Tell any tagged workers to go ahead with startup:
|
||||
for (auto& w : tagged_workers) {
|
||||
LMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
|
||||
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
|
||||
}
|
||||
|
||||
std::vector<zmq::message_t> parts;
|
||||
|
||||
// Wait for tagged worker threads to get ready and connect to us (we get a "STARTING" message)
|
||||
// and send them back a "START" to let them know to go ahead with startup. We need this
|
||||
// synchronization dance to guarantee that the workers are routable before we can proceed.
|
||||
if (!tagged_workers.empty()) {
|
||||
LMQ_LOG(debug, "Waiting for tagged workers");
|
||||
std::unordered_set<std::string_view> waiting_on;
|
||||
for (auto& w : tagged_workers)
|
||||
waiting_on.emplace(std::get<run_info>(w).worker_routing_id);
|
||||
for (; !waiting_on.empty(); parts.clear()) {
|
||||
recv_message_parts(workers_socket, parts);
|
||||
if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) {
|
||||
LMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup");
|
||||
continue;
|
||||
}
|
||||
LMQ_LOG(debug, "Received STARTING message from ", view(parts[0]));
|
||||
if (auto it = waiting_on.find(view(parts[0])); it != waiting_on.end())
|
||||
waiting_on.erase(it);
|
||||
else
|
||||
LMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0]));
|
||||
}
|
||||
|
||||
for (auto&w : tagged_workers) {
|
||||
LMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
|
||||
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
std::chrono::milliseconds poll_timeout;
|
||||
if (max_workers == 0) { // Will be 0 only if we are quitting
|
||||
|
|
|
@ -5,6 +5,41 @@
|
|||
|
||||
namespace lokimq {
|
||||
|
||||
namespace {
|
||||
|
||||
// Waits for a specific command or "QUIT" on the given socket. Returns true if the command was
|
||||
// received. If "QUIT" was received, replies with "QUITTING" on the socket and closes it, then
|
||||
// returns false.
|
||||
[[gnu::always_inline]] inline
|
||||
bool worker_wait_for(LokiMQ& lmq, zmq::socket_t& sock, std::vector<zmq::message_t>& parts, const std::string_view worker_id, const std::string_view expect) {
|
||||
while (true) {
|
||||
lmq.log(LogLevel::debug, __FILE__, __LINE__, "worker ", worker_id, " waiting for ", expect);
|
||||
parts.clear();
|
||||
recv_message_parts(sock, parts);
|
||||
if (parts.size() != 1) {
|
||||
lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part control msg");
|
||||
continue;
|
||||
}
|
||||
auto command = view(parts[0]);
|
||||
if (command == expect) {
|
||||
#ifndef NDEBUG
|
||||
lmq.log(LogLevel::trace, __FILE__, __LINE__, "Worker ", worker_id, " received waited-for ", expect, " command");
|
||||
#endif
|
||||
return true;
|
||||
} else if (command == "QUIT"sv) {
|
||||
lmq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down");
|
||||
detail::send_control(sock, "QUITTING");
|
||||
sock.setsockopt<int>(ZMQ_LINGER, 1000);
|
||||
sock.close();
|
||||
return false;
|
||||
} else {
|
||||
lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
|
||||
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
|
||||
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
|
||||
|
@ -23,6 +58,8 @@ void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
|||
sock.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size());
|
||||
LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
|
||||
sock.connect(SN_ADDR_WORKERS);
|
||||
if (tagged)
|
||||
detail::send_control(sock, "STARTING");
|
||||
|
||||
Message message{*this, 0, AuthLevel::none, ""s};
|
||||
std::vector<zmq::message_t> parts;
|
||||
|
@ -36,28 +73,9 @@ void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
|||
|
||||
waiting_for_command = true;
|
||||
|
||||
while (true) {
|
||||
LMQ_TRACE("tagged worker ", worker_id, " waiting for startup signal");
|
||||
recv_message_parts(sock, parts);
|
||||
if (parts.size() != 1) {
|
||||
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part startup msg");
|
||||
continue;
|
||||
}
|
||||
auto command = view(parts[0]);
|
||||
if (command == "START"sv) {
|
||||
LMQ_LOG(debug, "Tagged worker ", worker_id, " received start signal, starting up");
|
||||
if (start) start();
|
||||
break;
|
||||
} else if (command == "QUIT"sv) {
|
||||
LMQ_LOG(debug, "Tagged worker ", worker_id, " shutting down");
|
||||
detail::send_control(sock, "QUITTING");
|
||||
sock.setsockopt<int>(ZMQ_LINGER, 1000);
|
||||
sock.close();
|
||||
return;
|
||||
} else {
|
||||
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
|
||||
}
|
||||
}
|
||||
if (!worker_wait_for(*this, sock, parts, worker_id, "START"sv))
|
||||
return;
|
||||
if (start) start();
|
||||
} else {
|
||||
// Otherwise for a regular worker we can only be started by an active main proxy thread
|
||||
// which will have preloaded our first job so we can start off right away.
|
||||
|
@ -68,28 +86,9 @@ void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
|||
run_info& run = tagged ? std::get<run_info>(tagged_workers[index - 1]) : workers[index];
|
||||
|
||||
while (true) {
|
||||
while (waiting_for_command) {
|
||||
LMQ_TRACE("worker ", worker_id, " waiting for command");
|
||||
parts.clear();
|
||||
recv_message_parts(sock, parts);
|
||||
|
||||
if (parts.size() != 1) {
|
||||
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part worker instruction");
|
||||
continue;
|
||||
}
|
||||
auto command = view(parts[0]);
|
||||
if (command == "RUN"sv) {
|
||||
LMQ_LOG(debug, "worker ", worker_id, " running ", run.is_batch_job ? "batch"s : "command " + run.command);
|
||||
waiting_for_command = false;
|
||||
} else if (command == "QUIT"sv) {
|
||||
LMQ_LOG(debug, "worker ", worker_id, " shutting down");
|
||||
detail::send_control(sock, "QUITTING");
|
||||
sock.setsockopt<int>(ZMQ_LINGER, 1000);
|
||||
sock.close();
|
||||
if (waiting_for_command) {
|
||||
if (!worker_wait_for(*this, sock, parts, worker_id, "RUN"sv))
|
||||
return;
|
||||
} else {
|
||||
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -2,17 +2,16 @@
|
|||
#include "common.h"
|
||||
#include <future>
|
||||
|
||||
TEST_CASE("tagged thread init and start functions", "[tagged][init]") {
|
||||
TEST_CASE("tagged thread start functions", "[tagged][start]") {
|
||||
lokimq::LokiMQ lmq{get_logger(""), LogLevel::trace};
|
||||
|
||||
lmq.set_general_threads(2);
|
||||
lmq.set_batch_threads(2);
|
||||
auto t_abc = lmq.add_tagged_thread("abc");
|
||||
std::atomic<bool> init_called = false, initghi_called = false, start_called = false;
|
||||
auto t_def = lmq.add_tagged_thread("def", [&] { init_called = true; });
|
||||
auto t_ghi = lmq.add_tagged_thread("def", [&] { initghi_called = true; }, [&] { start_called = false; });
|
||||
std::atomic<bool> start_called = false;
|
||||
auto t_def = lmq.add_tagged_thread("def", [&] { start_called = true; });
|
||||
|
||||
wait_for([&] { return init_called.load() && initghi_called.load(); });
|
||||
std::this_thread::sleep_for(20ms);
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE_FALSE( start_called );
|
||||
|
@ -22,7 +21,7 @@ TEST_CASE("tagged thread init and start functions", "[tagged][init]") {
|
|||
wait_for([&] { return start_called.load(); });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE_FALSE( start_called );
|
||||
REQUIRE( start_called );
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue