Jason Rhinelander 253f1ee66e Move socket holding into LokiMQ instance
The thread_local `std::map` here can end up being destructed *before*
the LokiMQ instance (if both are being destroyed during thread joining),
in which case we segfault by trying to use the map.  Move the owning
container into the LokiMQ instead (indexed by the thread) to prevent

Also cleans this code up by:

- Don't close control sockets from the proxy thread; socket_t's aren't
necessarily thread safe so this could be causing issues where we trouble
double-closing or using a closed socket.

- We can just let them get closed during destruction of the LokiMQ.

- Avoid needing shared_ptr's; instead we can just use a unique pointer
with raw pointers in the thread_local cache.  This simplifies closing
because all closing will happen during the LokiMQ destruction.
2020-11-17 11:54:39 -04:00

709 lines
30 KiB

#include "lokimq.h"
#include "lokimq-internal.h"
#include "hex.h"
#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
extern "C" {
#include <pthread.h>
#include <pthread_np.h>
#ifndef _WIN32
extern "C" {
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
namespace lokimq {
void LokiMQ::proxy_quit() {
LMQ_LOG(debug, "Received quit command, shutting down proxy thread");
assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); }));
assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); }));
command.set(zmq::sockopt::linger, 0);
std::lock_guard lock{control_sockets_mutex};
proxy_shutting_down = true; // To prevent threads from opening new control sockets
int linger = std::chrono::milliseconds{CLOSE_LINGER}.count();
for (auto& s : connections)
s.set(zmq::sockopt::linger, linger);
LMQ_LOG(debug, "Proxy thread teardown complete");
void LokiMQ::proxy_send(bt_dict_consumer data) {
// NB: bt_dict_consumer goes in alphabetical order
std::string_view hint;
std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE};
std::chrono::milliseconds request_timeout{DEFAULT_REQUEST_TIMEOUT};
bool optional = false;
bool outgoing = false;
bool incoming = false;
bool request = false;
bool have_conn_id = false;
ConnectionID conn_id;
std::string request_tag;
ReplyCallback request_callback;
if (data.skip_until("conn_id")) {
conn_id.id = data.consume_integer<long long>();
if (conn_id.id == -1)
throw std::runtime_error("Invalid error: invalid conn_id value (-1)");
have_conn_id = true;
if (data.skip_until("conn_pubkey")) {
if (have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and conn_pubkey are exclusive");
conn_id.pk = data.consume_string();
conn_id.id = ConnectionID::SN_ID;
} else if (!have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy send command; conn_pubkey or conn_id missing");
if (data.skip_until("conn_route"))
conn_id.route = data.consume_string();
if (data.skip_until("hint"))
hint = data.consume_string_view();
if (data.skip_until("incoming"))
incoming = data.consume_integer<bool>();
if (data.skip_until("keep_alive"))
keep_alive = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (data.skip_until("optional"))
optional = data.consume_integer<bool>();
if (data.skip_until("outgoing"))
outgoing = data.consume_integer<bool>();
if (data.skip_until("request"))
request = data.consume_integer<bool>();
if (request) {
if (!data.skip_until("request_callback"))
throw std::runtime_error("Internal error: received request without request_callback");
request_callback = detail::deserialize_object<ReplyCallback>(data.consume_integer<uintptr_t>());
if (!data.skip_until("request_tag"))
throw std::runtime_error("Internal error: received request without request_name");
request_tag = data.consume_string();
if (data.skip_until("request_timeout"))
request_timeout = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (!data.skip_until("send"))
throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing");
bt_list_consumer send = data.consume_list_consumer();
send_option::queue_failure::callback_t callback_nosend;
if (data.skip_until("send_fail"))
callback_nosend = detail::deserialize_object<decltype(callback_nosend)>(data.consume_integer<uintptr_t>());
send_option::queue_full::callback_t callback_noqueue;
if (data.skip_until("send_full_q"))
callback_noqueue = detail::deserialize_object<decltype(callback_noqueue)>(data.consume_integer<uintptr_t>());
// Now figure out which socket to send to and do the actual sending. We can repeat this loop
// multiple times, if we're sending to a SN, because it's possible that we have multiple
// connections open to that SN (e.g. one out + one in) so if one fails we can clean up that
// connection and try the next one.
bool retry = true, sent = false, nowarn = false;
while (retry) {
retry = false;
zmq::socket_t *send_to;
if (conn_id.sn()) {
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, keep_alive);
if (!sock_route.first) {
nowarn = true;
if (optional)
LMQ_LOG(debug, "Not sending: send is optional and no connection to ",
to_hex(conn_id.pk), " is currently established");
LMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no valid connection address found");
send_to = sock_route.first;
conn_id.route = std::move(sock_route.second);
} else if (!conn_id.route.empty()) { // incoming non-SN connection
auto it = incoming_conn_index.find(conn_id.unrouted());
if (it == incoming_conn_index.end()) {
LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found");
send_to = &connections[it->second];
} else {
auto pr = peers.equal_range(conn_id);
if (pr.first == peers.end()) {
LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
auto& peer = pr.first->second;
send_to = &connections[peer.conn_index];
try {
sent = send_message_parts(*send_to, build_send_parts(send, conn_id.route));
} catch (const zmq::error_t &e) {
if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) {
LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details");
auto pr = peers.equal_range(conn_id);
if (pr.first != peers.end()) {
if (!conn_id.sn()) {
} else {
bool removed;
for (auto it = pr.first; it != pr.second; ) {
auto& peer = it->second;
if (peer.route == conn_id.route) {
removed = true;
// The incoming connection to the SN is no longer good, but we can retry because
// we may have another active connection with the SN (or may want to open one).
if (removed) {
LMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets");
retry = true;
if (!retry) {
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
nowarn = true;
if (callback_nosend) {
job([callback = std::move(callback_nosend), error = e] { callback(&error); });
callback_nosend = nullptr;
if (request) {
if (sent) {
LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag));
pending_requests.insert({ request_tag, {
std::chrono::steady_clock::now() + request_timeout, std::move(request_callback) }});
} else {
LMQ_LOG(debug, "Could not send request, scheduling request callback failure");
job([callback = std::move(request_callback)] { callback(false, {{"TIMEOUT"s}}); });
if (!sent) {
if (callback_nosend)
job([callback = std::move(callback_nosend)] { callback(nullptr); });
else if (callback_noqueue)
else if (!nowarn)
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": sending would block");
void LokiMQ::proxy_reply(bt_dict_consumer data) {
bool have_conn_id = false;
ConnectionID conn_id{0};
if (data.skip_until("conn_id")) {
conn_id.id = data.consume_integer<long long>();
if (conn_id.id == -1)
throw std::runtime_error("Invalid error: invalid conn_id value (-1)");
have_conn_id = true;
if (data.skip_until("conn_pubkey")) {
if (have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy reply command; conn_id and conn_pubkey are exclusive");
conn_id.pk = data.consume_string();
conn_id.id = ConnectionID::SN_ID;
} else if (!have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy reply command; conn_pubkey or conn_id missing");
if (!data.skip_until("send"))
throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing");
bt_list_consumer send = data.consume_list_consumer();
auto pr = peers.equal_range(conn_id);
if (pr.first == pr.second) {
LMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid");
// We try any connections until one works (for ordinary remotes there will be just one, but for
// SNs there might be one incoming and one outgoing).
for (auto it = pr.first; it != pr.second; ) {
try {
send_message_parts(connections[it->second.conn_index], build_send_parts(send, it->second.route));
} catch (const zmq::error_t &err) {
if (err.num() == EHOSTUNREACH) {
LMQ_LOG(debug, "Unable to send reply to incoming non-SN request: remote is no longer connected; removing peer details");
it = peers.erase(it);
} else {
LMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what());
void LokiMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
// We throw an uncaught exception here because we only generate control messages internally in
// lokimq code: if one of these condition fail it's a lokimq bug.
if (parts.size() < 2)
throw std::logic_error("LokiMQ bug: Expected 2-3 message parts for a proxy control message");
auto route = view(parts[0]), cmd = view(parts[1]);
LMQ_TRACE("control message: ", cmd);
if (parts.size() == 3) {
LMQ_TRACE("...: ", parts[2]);
auto data = view(parts[2]);
if (cmd == "SEND") {
LMQ_TRACE("proxying message");
return proxy_send(data);
} else if (cmd == "REPLY") {
LMQ_TRACE("proxying reply to non-SN incoming message");
return proxy_reply(data);
} else if (cmd == "BATCH") {
LMQ_TRACE("proxy batch jobs");
auto ptrval = bt_deserialize<uintptr_t>(data);
return proxy_batch(reinterpret_cast<detail::Batch*>(ptrval));
} else if (cmd == "INJECT") {
LMQ_TRACE("proxy inject");
return proxy_inject_task(detail::deserialize_object<injected_task>(bt_deserialize<uintptr_t>(data)));
} else if (cmd == "SET_SNS") {
return proxy_set_active_sns(data);
} else if (cmd == "UPDATE_SNS") {
return proxy_update_active_sns(data);
} else if (cmd == "CONNECT_SN") {
} else if (cmd == "CONNECT_REMOTE") {
return proxy_connect_remote(data);
} else if (cmd == "DISCONNECT") {
return proxy_disconnect(data);
} else if (cmd == "TIMER") {
return proxy_timer(data);
} else if (parts.size() == 2) {
if (cmd == "START") {
// Command send by the owning thread during startup; we send back a simple READY reply to
// let it know we are running.
return route_control(command, route, "READY");
} else if (cmd == "QUIT") {
// Asked to quit: set max_workers to zero and tell any idle ones to quit. We will
// close workers as they come back to READY status, and then close external
// connections once all workers are done.
max_workers = 0;
for (const auto &route : idle_workers)
route_control(workers_socket, workers[route].worker_routing_id, "QUIT");
for (auto& [run, busy, queue] : tagged_workers)
if (!busy)
route_control(workers_socket, run.worker_routing_id, "QUIT");
throw std::runtime_error("LokiMQ bug: Proxy received invalid control command: " +
std::string{cmd} + " (" + std::to_string(parts.size()) + ")");
void LokiMQ::proxy_loop() {
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
pthread_setname_np(pthread_self(), "lmq-proxy");
#elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
pthread_set_name_np(pthread_self(), "lmq-proxy");
#elif defined(__MACH__)
zap_auth.set(zmq::sockopt::linger, 0);
workers_socket.set(zmq::sockopt::router_mandatory, true);
assert(general_workers > 0);
if (batch_jobs_reserved < 0)
batch_jobs_reserved = (general_workers + 1) / 2;
if (reply_jobs_reserved < 0)
reply_jobs_reserved = (general_workers + 7) / 8;
max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved;
for (const auto& cat : categories) {
max_workers += cat.second.reserved_threads;
if (log_level() >= LogLevel::debug) {
LMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
for (const auto& cat : categories)
LMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
LMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
LMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
LMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
if (!workers.empty())
throw std::logic_error("Internal error: proxy thread started with active worker threads");
#ifndef _WIN32
int saved_umask = -1;
saved_umask = umask(STARTUP_UMASK);
for (size_t i = 0; i < bind.size(); i++) {
auto& b = bind[i].second;
zmq::socket_t listener{context, zmq::socket_type::router};
listener.set(zmq::sockopt::zap_domain, bt_serialize(i));
if (b.curve) {
listener.set(zmq::sockopt::curve_server, true);
listener.set(zmq::sockopt::curve_publickey, pubkey);
listener.set(zmq::sockopt::curve_secretkey, privkey);
listener.set(zmq::sockopt::router_handover, true);
listener.set(zmq::sockopt::router_mandatory, true);
LMQ_LOG(info, "LokiMQ listening on ", bind[i].first);
auto conn_id = next_conn_id++;
incoming_conn_index[conn_id] = connections.size() - 1;
b.index = connections.size() - 1;
#ifndef _WIN32
if (saved_umask != -1)
// set socket gid / uid if it is provided
if (SOCKET_GID != -1 or SOCKET_UID != -1) {
for(size_t i = 0; i < bind.size(); i++) {
const address addr(bind[i].first);
if(addr.ipc()) {
if(chown(addr.socket.c_str(), SOCKET_UID, SOCKET_GID) == -1) {
throw std::runtime_error("cannot set group on " + addr.socket + ": " + strerror(errno));
pollitems_stale = true;
// Also add an internal connection to self so that calling code can avoid needing to
// special-case rare situations where we are supposed to talk to a quorum member that happens to
// be ourselves (which can happen, for example, with cross-quoum Blink communication)
// FIXME: not working
if (!timers)
auto do_conn_cleanup = [this] { proxy_conn_cleanup(); };
using CleanupLambda = decltype(do_conn_cleanup);
if (-1 == zmq_timers_add(timers.get(),
// Wrap our lambda into a C function pointer where we pass in the lambda pointer as extra arg
[](int /*timer_id*/, void* cleanup) { (*static_cast<CleanupLambda*>(cleanup))(); },
&do_conn_cleanup)) {
throw zmq::error_t{};
std::vector<zmq::message_t> parts;
// Wait for tagged worker threads to get ready and connect to us (we get a "STARTING" message)
// and send them back a "START" to let them know to go ahead with startup. We need this
// synchronization dance to guarantee that the workers are routable before we can proceed.
if (!tagged_workers.empty()) {
LMQ_LOG(debug, "Waiting for tagged workers");
std::unordered_set<std::string_view> waiting_on;
for (auto& w : tagged_workers)
for (; !waiting_on.empty(); parts.clear()) {
recv_message_parts(workers_socket, parts);
if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) {
LMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup");
LMQ_LOG(debug, "Received STARTING message from ", view(parts[0]));
if (auto it = waiting_on.find(view(parts[0])); it != waiting_on.end())
LMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0]));
for (auto&w : tagged_workers) {
LMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
while (true) {
std::chrono::milliseconds poll_timeout;
if (max_workers == 0) { // Will be 0 only if we are quitting
if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.worker_thread.joinable(); }) &&
std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto &w) { return std::get<0>(w).worker_thread.joinable(); })) {
// All the workers have finished, so we can finish shutting down
return proxy_quit();
poll_timeout = 1s; // We don't keep running timers when we're quitting, so don't have a timer to check
} else {
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
if (proxy_skip_one_poll)
proxy_skip_one_poll = false;
else {
LMQ_TRACE("polling for new messages");
if (pollitems_stale)
// We poll the control socket and worker socket for any incoming messages. If we have
// available worker room then also poll incoming connections and outgoing connections
// for messages to forward to a worker. Otherwise, we just look for a control message
// or a worker coming back with a ready message.
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
LMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) {
LMQ_TRACE("processing worker messages");
for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) {
LMQ_TRACE("processing timers");
// Handle any zap authentication
LMQ_TRACE("processing zap requests");
// See if we can drain anything from the current queue before we potentially add to it
// below.
LMQ_TRACE("processing queued jobs and messages");
LMQ_TRACE("processing new incoming messages");
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
const int num_sockets = connections.size();
std::queue<int> queue_index;
for (int i = 0; i < num_sockets; i++)
for (parts.clear(); !queue_index.empty(); parts.clear()) {
size_t i = queue_index.front();
auto& sock = connections[i];
if (!recv_message_parts(sock, parts, zmq::recv_flags::dontwait))
// We only pull this one message now but then requeue the socket so that after we check
// all other sockets we come back to this one to check again.
if (parts.empty()) {
LMQ_LOG(warn, "Ignoring empty (0-part) incoming message");
if (!proxy_handle_builtin(i, parts))
proxy_to_worker(i, parts);
if (pollitems_stale) {
// If our items became stale then we may have just closed a connection and so our
// queue index maybe also be stale, so restart the proxy loop (so that we rebuild
// pollitems).
LMQ_TRACE("pollitems became stale; short-circuiting incoming message loop");
LMQ_TRACE("done proxy loop");
static bool is_error_response(std::string_view cmd) {
return cmd == "FORBIDDEN" || cmd == "FORBIDDEN_SN" || cmd == "NOT_A_SERVICE_NODE" || cmd == "UNKNOWNCOMMAND" || cmd == "NO_REPLY_TAG";
// Return true if we recognized/handled the builtin command (even if we reject it for whatever
// reason)
bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>& parts) {
// Doubling as a bool and an offset:
size_t incoming = connections[conn_index].get(zmq::sockopt::type) == ZMQ_ROUTER;
std::string_view route, cmd;
if (parts.size() < 1 + incoming) {
LMQ_LOG(warn, "Received empty message; ignoring");
return true;
if (incoming) {
route = view(parts[0]);
cmd = view(parts[1]);
} else {
cmd = view(parts[0]);
LMQ_TRACE("Checking for builtins: '", cmd, "' from ", peer_address(parts.back()));
if (cmd == "REPLY") {
size_t tag_pos = 1 + incoming;
if (parts.size() <= tag_pos) {
LMQ_LOG(warn, "Received REPLY without a reply tag; ignoring");
return true;
std::string reply_tag{view(parts[tag_pos])};
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
LMQ_LOG(debug, "Received REPLY for pending command ", to_hex(reply_tag), "; scheduling callback");
std::vector<std::string> data;
data.reserve(parts.size() - (tag_pos + 1));
for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it)
proxy_schedule_reply_job([callback=std::move(it->second.second), data=std::move(data)] {
callback(true, std::move(data));
} else {
LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
return true;
} else if (cmd == "HI") {
if (!incoming) {
LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring");
return true;
LMQ_LOG(debug, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO");
try {
send_routed_message(connections[conn_index], std::string{route}, "HELLO");
} catch (const std::exception &e) { LMQ_LOG(warn, "Couldn't reply with HELLO: ", e.what()); }
return true;
} else if (cmd == "HELLO") {
if (incoming) {
LMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring");
return true;
auto it = std::find_if(pending_connects.begin(), pending_connects.end(),
[&](auto& pc) { return std::get<size_t>(pc) == conn_index; });
if (it == pending_connects.end()) {
LMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring");
return true;
auto& pc = *it;
auto pit = peers.find(std::get<long long>(pc));
if (pit == peers.end()) {
LMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring");
return true;
LMQ_LOG(debug, "Got initial HELLO server response from ", peer_address(parts.back()));
conn=conn_index_to_id[conn_index]] {
return true;
} else if (cmd == "BYE") {
if (!incoming) {
LMQ_LOG(debug, "BYE command received; disconnecting from ", peer_address(parts.back()));
proxy_close_connection(conn_index, 0s);
} else {
LMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring");
return true;
else if (is_error_response(cmd)) {
// These messages (FORBIDDEN, UNKNOWNCOMMAND, etc.) are sent in response to us trying to
// invoke something that doesn't exist or we don't have permission to access. These have
// two forms (the latter is only sent by remotes running 1.1.0+).
// - ["XXX", "whatever.command"]
// - ["XXX", "REPLY", replytag]
// (ignoring the routing prefix on incoming commands).
// For the former, we log; for the latter we trigger the reply callback with a failure
if (parts.size() == (1 + incoming) && cmd == "UNKNOWNCOMMAND") {
// pre-1.1.0 sent just a plain UNKNOWNCOMMAND (without the actual command); this was not
// useful, but also this response is *expected* for things 1.0.5 didn't understand, like
// FORBIDDEN_SN: so log it only at debug level and move on.
LMQ_LOG(debug, "Received plain UNKNOWNCOMMAND; remote is probably an older lokimq. Ignoring.");
return true;
if (parts.size() == (3 + incoming) && view(parts[1 + incoming]) == "REPLY") {
std::string reply_tag{view(parts[2 + incoming])};
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
LMQ_LOG(debug, "Received ", cmd, " REPLY for pending command ", to_hex(reply_tag), "; scheduling failure callback");
proxy_schedule_reply_job([callback=std::move(it->second.second), cmd=std::string{cmd}] {
callback(false, {{std::move(cmd)}});
} else {
LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
} else {
LMQ_LOG(warn, "Received ", cmd, ':', (parts.size() > 1 + incoming ? view(parts[1 + incoming]) : "(unknown command)"sv),
" from ", peer_address(parts.back()));
return true;
return false;
void LokiMQ::proxy_process_queue() {
if (max_workers == 0) // shutting down
// First: send any tagged thread tasks to the tagged threads, if idle
for (auto& [run, busy, queue] : tagged_workers) {
if (!busy && !queue.empty()) {
busy = true;
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
// Second: process any batch jobs; since these are internal they are given higher priority.
proxy_run_batch_jobs(batch_jobs, batch_jobs_reserved, batch_jobs_active, false);
// Next any reply batch jobs (which are a bit different from the above, since they are
// externally triggered but for things we initiated locally).
proxy_run_batch_jobs(reply_jobs, reply_jobs_reserved, reply_jobs_active, true);
// Finally general incoming commands
for (auto it = pending_commands.begin(); it != pending_commands.end() && active_workers() < max_workers; ) {
auto& pending = *it;
if (pending.cat.active_threads < pending.cat.reserved_threads
|| active_workers() < general_workers) {
assert(pending.cat.queued >= 0);
it = pending_commands.erase(it);
} else {
++it; // no available general or reserved worker spots for this job right now