Fix proxy thread stall when workers fill up

When we hit the limit on the number of workers the proxy thread would
stop processing incoming messages, sending it into an infinite loop of
death.  The check was supposed to use `active_workers()` rather than
`workers.size()`, but even that isn't quite right: we want to *always*
pull all incoming messages off and queue them internally since different
categories have their own queue sizes (and so we have to pull it off to
know whether we want to keep it -- if spare category queue room -- or
drop it).
This commit is contained in:
Jason Rhinelander 2020-04-21 16:55:40 -03:00
parent 0ebfef2164
commit 6ddf033674
3 changed files with 43 additions and 2 deletions

View File

@ -28,7 +28,7 @@ void LokiMQ::proxy_schedule_reply_job(std::function<void()> f) {
}
void LokiMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
while (!jobs.empty() && static_cast<int>(workers.size()) < max_workers &&
while (!jobs.empty() && active_workers() < max_workers &&
(active < reserved || active_workers() < general_workers)) {
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
jobs.pop();

View File

@ -436,7 +436,7 @@ void LokiMQ::proxy_loop() {
for (int i = 0; i < num_sockets; i++)
queue_index.push(i);
for (parts.clear(); !queue_index.empty() && static_cast<int>(workers.size()) < max_workers; parts.clear()) {
for (parts.clear(); !queue_index.empty(); parts.clear()) {
size_t i = queue_index.front();
queue_index.pop();
auto& sock = connections[i];

View File

@ -288,3 +288,44 @@ TEST_CASE("SN auth checks", "[sandwich][auth]") {
REQUIRE( data == dvec{{"FORBIDDEN_SN"}} );
}
}
TEST_CASE("SN single worker test", "[connect][worker]") {
// Tests a failure case that could trigger when all workers are allocated (here we make that
// simpler by just having one worker).
std::string listen = "tcp://127.0.0.1:4455";
LokiMQ server{
"", "",
false, // service node
[](auto) { return ""; },
get_logger(""),
LogLevel::trace
};
server.set_general_threads(1);
server.set_batch_threads(0);
server.set_reply_threads(0);
server.listen_plain(listen);
server.add_category("c", Access{AuthLevel::none})
.add_request_command("x", [&](Message& m) { m.send_reply(); })
;
server.start();
LokiMQ client{get_logger(""), LogLevel::trace};
client.start();
auto conn = client.connect_remote(listen, [](auto) {}, [](auto, auto) {});
std::atomic<int> got{0};
std::atomic<int> success{0};
client.request(conn, "c.x", [&](auto success_, auto) { if (success_) ++success; ++got; });
wait_for([&] { return got.load() >= 1; });
{
auto lock = catch_lock();
REQUIRE( success == 1 );
}
client.request(conn, "c.x", [&](auto success_, auto) { if (success_) ++success; ++got; });
wait_for([&] { return got.load() >= 2; });
{
auto lock = catch_lock();
REQUIRE( success == 2 );
}
}