Add ability to run a batch completion job in the proxy thread

For very small jobs (i.e. just setting a flag or something) this is
going to be faster than dispatching to a thread.
This commit is contained in:
Jason Rhinelander 2020-02-11 02:15:22 -04:00
parent 63c71396be
commit 3ff66490ad
2 changed files with 30 additions and 2 deletions

View File

@ -38,7 +38,8 @@ namespace detail {
enum class BatchStatus {
running, // there are still jobs to run (or running)
complete, // the batch is complete but still has a completion function to call
complete, // the batch is complete but still has a completion job to call
complete_proxy, // same as `complete`, but the completion job should be invoked immediately in the proxy thread (be very careful)
done // the batch is complete and has no completion function
};
@ -154,6 +155,7 @@ private:
std::vector<job_result<R>> results;
CompletionFunc complete;
std::size_t jobs_outstanding = 0;
bool complete_in_proxy = false;
bool started = false;
void check_not_started() {
@ -184,9 +186,22 @@ public:
/// then jobs simply run and results are discarded.
void completion(CompletionFunc comp) {
check_not_started();
if (complete)
throw std::logic_error("Completion function can only be set once");
complete = std::move(comp);
}
/// Sets a completion function to invoke *IN THE PROXY THREAD* after all jobs have finished. Be
/// very, very careful: this should not be a job that takes any significant amount of CPU time
/// or can block for any reason (NO MUTEXES).
void completion_proxy(CompletionFunc comp) {
check_not_started();
if (complete)
throw std::logic_error("Completion function can only be set once");
complete = std::move(comp);
complete_in_proxy = true;
}
private:
std::size_t size() const override {
@ -212,7 +227,9 @@ private:
if (jobs_outstanding)
return detail::BatchStatus::running;
if (complete)
return detail::BatchStatus::complete;
return complete_in_proxy
? detail::BatchStatus::complete_proxy
: detail::BatchStatus::complete;
return detail::BatchStatus::done;
}

View File

@ -1001,6 +1001,17 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
auto status = run.batch->job_finished();
if (status == detail::BatchStatus::complete) {
batch_jobs.emplace(run.batch, -1);
} else if (status == detail::BatchStatus::complete_proxy) {
try {
run.batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD
} catch (const std::exception &e) {
// Raise these to error levels: you really shouldn't be doing anything
// complicated in an in-proxy completion function!
LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
} catch (...) {
LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
}
clear_job = true;
} else if (status == detail::BatchStatus::done) {
clear_job = true;
}