Distinguish between batch jobs and reply jobs

This adds a separate category (and reserve count) for "reply jobs",
which are jobs triggered by receiving a reply to a request, or after a
successful connect or unsuccessful timeout.  Previously these were
scheduled as regular batch jobs; this schedules them as a new "reply
jobs" category with its own reserved threads count.

This also changes the defaults for batch jobs and reply jobs to be based
on the specified general workers count rather than directly on hardware
concurrency, so that if you are on a 16-thread CPU but override general
workers from its default of 16 to 4 and don't change batch workers you
now get reserved batch workers set to 2 rather than 8 which constrains
the typical parallel batch jobs to 4 (i.e. the general worker limit)
rather than exceeding it with the batch job limit.

Similarly for reply jobs, which is now ceil(general/8) by default.
This commit is contained in:
Jason Rhinelander 2020-02-28 17:54:00 -04:00
parent 57f0ca74da
commit 2743e576b2
2 changed files with 121 additions and 73 deletions

View File

@ -821,15 +821,15 @@ void LokiMQ::proxy_batch(detail::Batch* batch) {
const int jobs = batch->size();
for (int i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
proxy_skip_one_poll = true;
Batch<void>* LokiMQ::proxy_schedule_job(std::function<void()> f) {
void LokiMQ::proxy_schedule_reply_job(std::function<void()> f) {
auto* b = new Batch<void>;
batch_jobs.emplace(static_cast<detail::Batch*>(b), 0);
proxy_skip_poll = true;
return b;
reply_jobs.emplace(static_cast<detail::Batch*>(b), 0);
proxy_skip_one_poll = true;
// Called either within the proxy thread, or before the proxy thread has been created; actually adds
@ -954,7 +954,7 @@ void LokiMQ::proxy_conn_cleanup() {
// Drop idle connections (if we haven't done it in a while) but *only* if we have some idle
// general workers: if we don't have any idle workers then we may still have incoming messages which
// we haven't processed yet and those messages might end up resetting the last activity time.
if (workers.size() < general_workers) {
if (static_cast<int>(workers.size()) < general_workers) {
LMQ_TRACE("closing idle connections");
@ -1012,10 +1012,13 @@ void LokiMQ::proxy_loop() {
workers_socket.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);
if (general_workers == 0)
general_workers = std::max(std::thread::hardware_concurrency(), 1u);
assert(general_workers > 0);
if (batch_jobs_reserved < 0)
batch_jobs_reserved = (general_workers + 1) / 2;
if (reply_jobs_reserved < 0)
reply_jobs_reserved = (general_workers + 7) / 8;
max_workers = general_workers + batch_jobs_reserved;
max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved;
for (const auto& cat : categories) {
max_workers += cat.second.reserved_threads;
@ -1026,6 +1029,7 @@ void LokiMQ::proxy_loop() {
for (const auto& cat : categories)
LMQ_TRACE(" - ", cat.first, ": ", cat.second.reserved_threads);
LMQ_TRACE(" - (batch jobs): ", batch_jobs_reserved);
LMQ_TRACE(" - (reply jobs): ", reply_jobs_reserved);
@ -1093,8 +1097,8 @@ void LokiMQ::proxy_loop() {
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
if (proxy_skip_poll)
proxy_skip_poll = false;
if (proxy_skip_one_poll)
proxy_skip_one_poll = false;
else {
LMQ_TRACE("polling for new messages");
@ -1141,7 +1145,7 @@ void LokiMQ::proxy_loop() {
for (int i = 0; i < num_sockets; i++)
for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) {
for (parts.clear(); !queue_index.empty() && static_cast<int>(workers.size()) < max_workers; parts.clear()) {
size_t i = queue_index.front();
auto& sock = connections[i];
@ -1225,8 +1229,10 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
if (cmd == "RAN") {
LMQ_LOG(debug, "Worker ", route, " finished ", run.command);
if (run.is_batch_job) {
assert(batch_jobs_active > 0);
auto& jobs = run.is_reply_job ? reply_jobs : batch_jobs;
auto& active = run.is_reply_job ? reply_jobs_active : batch_jobs_active;
assert(active > 0);
bool clear_job = false;
if (run.batch_jobno == -1) {
// Returned from the completion function
@ -1234,13 +1240,13 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
} else {
auto status = run.batch->job_finished();
if (status == detail::BatchStatus::complete) {
batch_jobs.emplace(run.batch, -1);
jobs.emplace(run.batch, -1);
} else if (status == detail::BatchStatus::complete_proxy) {
try {
run.batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD
} catch (const std::exception &e) {
// Raise these to error levels: you really shouldn't be doing anything
// complicated in an in-proxy completion function!
// Raise these to error levels: the caller really shouldn't be doing
// anything non-trivial in an in-proxy completion function!
LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
} catch (...) {
LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
@ -1306,7 +1312,7 @@ bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>
data.reserve(parts.size() - (tag_pos + 1));
for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it)
proxy_schedule_job([callback=std::move(it->second.second), data=std::move(data)] {
proxy_schedule_reply_job([callback=std::move(it->second.second), data=std::move(data)] {
callback(true, std::move(data));
@ -1341,7 +1347,7 @@ bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>
LMQ_LOG(info, "Got initial HELLO server response from ", peer_address(parts.back()));
conn=conn_index_to_id[conn_index]] {
@ -1383,36 +1389,56 @@ LokiMQ::run_info& LokiMQ::get_idle_worker() {
return workers[id];
void LokiMQ::set_batch_threads(unsigned int threads) {
void LokiMQ::set_batch_threads(int threads) {
if (proxy_thread.joinable())
throw std::logic_error("Cannot change reserved batch threads after calling `start()`");
if (threads < -1) // -1 is the default which is based on general threads
throw std::out_of_range("Invalid set_batch_threads() value " + std::to_string(threads));
batch_jobs_reserved = threads;
void LokiMQ::set_general_threads(unsigned int threads) {
void LokiMQ::set_reply_threads(int threads) {
if (proxy_thread.joinable())
throw std::logic_error("Cannot change reserved reply threads after calling `start()`");
if (threads < -1) // -1 is the default which is based on general threads
throw std::out_of_range("Invalid set_reply_threads() value " + std::to_string(threads));
reply_jobs_reserved = threads;
void LokiMQ::set_general_threads(int threads) {
if (proxy_thread.joinable())
throw std::logic_error("Cannot change general thread count after calling `start()`");
if (threads < 1)
throw std::out_of_range("Invalid set_general_threads() value " + std::to_string(threads) + ": general threads must be > 0");
general_workers = threads;
LokiMQ::run_info& LokiMQ::run_info::operator=(pending_command&& pending) {
LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, std::string route_,
std::vector<zmq::message_t> data_parts_, const std::pair<CommandCallback, bool>* callback_) {
is_batch_job = false;
cat = &pending.cat;
command = std::move(pending.command);
conn = std::move(pending.conn);
conn_route = std::move(pending.conn_route);
data_parts = std::move(pending.data_parts);
callback = pending.callback;
is_reply_job = false;
cat = cat_;
command = std::move(command_);
conn = std::move(conn_);
conn_route = std::move(route_);
data_parts = std::move(data_parts_);
callback = callback_;
return *this;
LokiMQ::run_info& LokiMQ::run_info::operator=(batch_job&& bj) {
LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) {
return load(&pending.cat, std::move(pending.command), std::move(pending.conn),
std::move(pending.conn_route), std::move(pending.data_parts), pending.callback);
LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) {
is_batch_job = true;
is_reply_job = reply_job;
batch_jobno = bj.second;
batch = bj.first;
return *this;
void LokiMQ::proxy_run_worker(run_info& run) {
if (!run.worker_thread.joinable())
run.worker_thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id};
@ -1420,21 +1446,29 @@ void LokiMQ::proxy_run_worker(run_info& run) {
send_routed_message(workers_socket, run.worker_routing_id, "RUN");
void LokiMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
while (!jobs.empty() &&
(active < reserved || static_cast<int>(workers.size() - idle_workers.size()) < general_workers)) {
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
void LokiMQ::proxy_process_queue() {
// First up: process any batch jobs; since these are internally they are given higher priority.
while (!batch_jobs.empty() &&
(batch_jobs_active < batch_jobs_reserved || workers.size() - idle_workers.size() < general_workers)) {
proxy_run_worker(get_idle_worker() = std::move(batch_jobs.front()));
// First up: process any batch jobs; since these are internal they are given higher priority.
proxy_run_batch_jobs(batch_jobs, batch_jobs_reserved, batch_jobs_active, false);
// Next any reply batch jobs (which are a bit different from the above, since they are
// externally triggered but for things we initiated locally).
proxy_run_batch_jobs(reply_jobs, reply_jobs_reserved, reply_jobs_active, true);
// Finally general incoming commands
for (auto it = pending_commands.begin(); it != pending_commands.end() && active_workers() < max_workers; ) {
auto& pending = *it;
if (pending.cat.active_threads < pending.cat.reserved_threads
|| active_workers() < general_workers) {
proxy_run_worker(get_idle_worker() = std::move(pending));
assert(pending.cat.queued >= 0);
@ -1530,24 +1564,14 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
auto& run = get_idle_worker();
run.is_batch_job = false;
run.cat = &category;
run.command = std::move(command);
run.conn.pk = peer->pubkey;
if (peer->service_node) {
run.conn.id = ConnectionID::SN_ID;
} else {
run.conn.id = conn_index_to_id[conn_index].id;
if (outgoing)
run.conn_route = tmp_peer.route;
ConnectionID c{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey};
if (outgoing || peer->service_node)
run.load(&category, std::move(command), std::move(c), std::move(tmp_peer.route),
std::move(data_parts), cat_call.second);
run.data_parts = std::move(data_parts);
run.callback = cat_call.second;
if (outgoing)
peer->activity(); // outgoing connection activity, pump the activity timer
@ -1796,7 +1820,7 @@ void LokiMQ::proxy_connect_remote(bt_dict_consumer data) {
setup_outgoing_socket(sock, remote_pubkey);
} catch (const zmq::error_t &e) {
proxy_schedule_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
on_failure(conn_id, std::move(what));

View File

@ -459,23 +459,23 @@ private:
std::vector<unsigned int> idle_workers;
/// Maximum number of general task workers, specified by g`/during construction
unsigned int general_workers = std::thread::hardware_concurrency();
int general_workers = std::max<int>(1, std::thread::hardware_concurrency());
/// Maximum number of possible worker threads we can have. This is calculated when starting,
/// and equals general_workers plus the sum of all categories' reserved threads counts plus the
/// reserved batch workers count. This is also used to signal a shutdown; we set it to 0 when
/// quitting.
unsigned int max_workers;
int max_workers;
/// Number of active workers
unsigned int active_workers() const { return workers.size() - idle_workers.size(); }
int active_workers() const { return workers.size() - idle_workers.size(); }
/// Worker thread loop
void worker_thread(unsigned int index);
/// If set, skip polling for one proxy loop iteration (set when we know we have something
/// processible without having to shove it onto a socket, such as scheduling an internal job).
bool proxy_skip_poll = false;
bool proxy_skip_one_poll = false;
/// Does the proxying work
void proxy_loop();
@ -486,7 +486,7 @@ private:
void proxy_process_queue();
Batch<void>* proxy_schedule_job(std::function<void()> f);
void proxy_schedule_reply_job(std::function<void()> f);
/// Looks up a peers element given a connect index (for outgoing connections where we already
/// knew the pubkey and SN status) or an incoming zmq message (which has the pubkey and sn
@ -567,13 +567,17 @@ private:
/// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open).
void proxy_reply(bt_dict_consumer data);
/// Currently active batches.
/// Currently active batch/reply jobs; this is the container that owns the Batch instances
std::unordered_set<detail::Batch*> batches;
/// Individual batch jobs waiting to run
using batch_job = std::pair<detail::Batch*, int>;
std::queue<batch_job> batch_jobs;
unsigned int batch_jobs_active = 0;
unsigned int batch_jobs_reserved = std::max((std::thread::hardware_concurrency() + 1) / 2, 1u);
std::queue<batch_job> batch_jobs, reply_jobs;
int batch_jobs_active = 0;
int reply_jobs_active = 0;
int batch_jobs_reserved = -1;
int reply_jobs_reserved = -1;
/// Runs any queued batch jobs
void proxy_run_batch_jobs(std::queue<batch_job>& jobs, int reserved, int& active, bool reply);
/// BATCH command. Called with a Batch<R> (see lokimq/batch.h) object pointer for the proxy to
/// take over and queue batch jobs.
@ -662,6 +666,7 @@ private:
/// transient data we are passing into the thread.
struct run_info {
bool is_batch_job = false;
bool is_reply_job = false;
// If is_batch_job is false then these will be set appropriate (if is_batch_job is true then
// these shouldn't be accessed and likely contain stale data).
@ -684,10 +689,14 @@ private:
size_t worker_id; // The index in `workers`
std::string worker_routing_id; // "w123" where 123 == worker_id
/// Loads the run info with a pending command
run_info& operator=(pending_command&& pending);
/// Loads the run info with an incoming command
run_info& load(category* cat, std::string command, ConnectionID conn, std::string route,
std::vector<zmq::message_t> data_parts, const std::pair<CommandCallback, bool>* callback);
/// Loads the run info with a stored pending command
run_info& load(pending_command&& pending);
/// Loads the run info with a batch job
run_info& operator=(batch_job&& bj);
run_info& load(batch_job&& bj, bool reply_job = false);
/// Data passed to workers for the RUN command. The proxy thread sets elements in this before
/// sending RUN to a worker then the worker uses it to get call info, and only allocates it
@ -825,17 +834,28 @@ public:
void add_command_alias(std::string from, std::string to);
* Sets the number of worker threads reserved for batch jobs. If not called this defaults to
* half the number of hardware threads available (rounded up). This works exactly like
* reserved_threads for a category, but allows to batch jobs. See category for details.
* Sets the number of worker threads reserved for batch jobs. If not explicitly called then
* this defaults to half the general worker threads configured (rounded up). This works exactly
* like reserved_threads for a category, but allows to batch jobs. See category for details.
* Note that some internal jobs are counted as batch jobs: in particular timers added via
* add_timer() and replies received in response to request commands currently each take a batch
* job slot when invoked.
* add_timer() are scheduled as batch jobs.
* Cannot be called after start()ing the LokiMQ instance.
void set_batch_threads(unsigned int threads);
void set_batch_threads(int threads);
* Sets the number of worker threads reserved for handling replies from servers; this is
* mostly for responses to `request()` calls, but also gets used for other network-related
* events such as the ConnectSuccess/ConnectFailure callbacks for establishing remote non-SN
* connections.
* Defaults to one-eighth of the number of configured general threads, rounded up.
* Cannot be changed after start()ing the LokiMQ instance.
void set_reply_threads(int threads);
* Sets the number of general worker threads. This is the target number of threads to run that
@ -844,9 +864,13 @@ public:
* reserved threads can create threads in addition to the amount specified here if necessary to
* fulfill the reserved threads count for the category.
* Adjusting this also adjusts the default values of batch and reply threads, above.
* Defaults to `std::thread::hardware_concurrency()`.
* Cannot be called after start()ing the LokiMQ instance.
void set_general_threads(unsigned int threads);
void set_general_threads(int threads);
* Finish starting up: binds to the bind locations given in the constructor and launches the