Change std::queue to std::deque typedef

This shouldn't make any difference with an optimizing compiler, but
makes it easier a bit easier to experiment with different data structures.
This commit is contained in:
Jason Rhinelander 2022-05-12 12:32:17 -03:00
parent 371606cde0
commit fa6de369b2
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
4 changed files with 14 additions and 13 deletions

View File

@ -9,16 +9,16 @@ void OxenMQ::proxy_batch(detail::Batch* batch) {
OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
if (!tagged_threads) {
for (size_t i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
batch_jobs.emplace_back(batch, i);
} else {
// Some (or all) jobs have a specific thread target so queue any such jobs in the tagged
// worker queue.
auto threads = batch->threads();
for (size_t i = 0; i < jobs; i++) {
auto& jobs = threads[i] > 0
? std::get<std::queue<batch_job>>(tagged_workers[threads[i] - 1])
? std::get<batch_queue>(tagged_workers[threads[i] - 1])
: batch_jobs;
jobs.emplace(batch, i);
jobs.emplace_back(batch, i);
}
}
@ -39,11 +39,11 @@ void OxenMQ::proxy_schedule_reply_job(std::function<void()> f) {
proxy_skip_one_poll = true;
}
void OxenMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
void OxenMQ::proxy_run_batch_jobs(batch_queue& jobs, const int reserved, int& active, bool reply) {
while (!jobs.empty() && active_workers() < max_workers &&
(active < reserved || active_workers() < general_workers)) {
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
jobs.pop();
jobs.pop_front();
active++;
}
}
@ -114,9 +114,9 @@ void OxenMQ::_queue_timer_job(int timer_id) {
OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
assert(b->size() == std::make_pair(size_t{1}, thread > 0));
auto& queue = thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1])
? std::get<batch_queue>(tagged_workers[thread - 1])
: batch_jobs;
queue.emplace(static_cast<detail::Batch*>(b), 0);
queue.emplace_back(static_cast<detail::Batch*>(b), 0);
}
void OxenMQ::add_timer(TimerID& timer, std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {

View File

@ -579,13 +579,14 @@ private:
/// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the
/// completion job
using batch_job = std::pair<detail::Batch*, int>;
std::queue<batch_job> batch_jobs, reply_jobs;
using batch_queue = std::deque<batch_job>;
batch_queue batch_jobs, reply_jobs;
int batch_jobs_active = 0;
int reply_jobs_active = 0;
int batch_jobs_reserved = -1;
int reply_jobs_reserved = -1;
/// Runs any queued batch jobs
void proxy_run_batch_jobs(std::queue<batch_job>& jobs, int reserved, int& active, bool reply);
void proxy_run_batch_jobs(batch_queue& jobs, int reserved, int& active, bool reply);
/// BATCH command. Called with a Batch<R> (see oxenmq/batch.h) object pointer for the proxy to
/// take over and queue batch jobs.
@ -768,7 +769,7 @@ private:
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
/// is whether the worker is currently busy (true) or available (false).
std::vector<std::tuple<run_info, bool, std::queue<batch_job>>> tagged_workers;
std::vector<std::tuple<run_info, bool, batch_queue>> tagged_workers;
public:
/**

View File

@ -731,7 +731,7 @@ void OxenMQ::proxy_process_queue() {
if (!busy && !queue.empty()) {
busy = true;
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
queue.pop();
queue.pop_front();
}
}

View File

@ -233,11 +233,11 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
} else {
auto& jobs =
thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1]) // run in tagged thread
? std::get<batch_queue>(tagged_workers[thread - 1]) // run in tagged thread
: run.is_reply_job
? reply_jobs
: batch_jobs;
jobs.emplace(batch, -1);
jobs.emplace_back(batch, -1);
}
} else if (state == detail::BatchState::done) {
// No completion job