From 3ff66490ad6f56da868fcae25ee708dbd2c56e8c Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 11 Feb 2020 02:15:22 -0400 Subject: [PATCH] Add ability to run a batch completion job in the proxy thread For very small jobs (i.e. just setting a flag or something) this is going to be faster than dispatching to a thread. --- lokimq/batch.h | 21 +++++++++++++++++++-- lokimq/lokimq.cpp | 11 +++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/lokimq/batch.h b/lokimq/batch.h index 6f621fa..09d15a6 100644 --- a/lokimq/batch.h +++ b/lokimq/batch.h @@ -38,7 +38,8 @@ namespace detail { enum class BatchStatus { running, // there are still jobs to run (or running) - complete, // the batch is complete but still has a completion function to call + complete, // the batch is complete but still has a completion job to call + complete_proxy, // same as `complete`, but the completion job should be invoked immediately in the proxy thread (be very careful) done // the batch is complete and has no completion function }; @@ -154,6 +155,7 @@ private: std::vector> results; CompletionFunc complete; std::size_t jobs_outstanding = 0; + bool complete_in_proxy = false; bool started = false; void check_not_started() { @@ -184,9 +186,22 @@ public: /// then jobs simply run and results are discarded. void completion(CompletionFunc comp) { check_not_started(); + if (complete) + throw std::logic_error("Completion function can only be set once"); complete = std::move(comp); } + /// Sets a completion function to invoke *IN THE PROXY THREAD* after all jobs have finished. Be + /// very, very careful: this should not be a job that takes any significant amount of CPU time + /// or can block for any reason (NO MUTEXES). + void completion_proxy(CompletionFunc comp) { + check_not_started(); + if (complete) + throw std::logic_error("Completion function can only be set once"); + complete = std::move(comp); + complete_in_proxy = true; + } + private: std::size_t size() const override { @@ -212,7 +227,9 @@ private: if (jobs_outstanding) return detail::BatchStatus::running; if (complete) - return detail::BatchStatus::complete; + return complete_in_proxy + ? detail::BatchStatus::complete_proxy + : detail::BatchStatus::complete; return detail::BatchStatus::done; } diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 4ea80c1..7809a6e 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -1001,6 +1001,17 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { auto status = run.batch->job_finished(); if (status == detail::BatchStatus::complete) { batch_jobs.emplace(run.batch, -1); + } else if (status == detail::BatchStatus::complete_proxy) { + try { + run.batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD + } catch (const std::exception &e) { + // Raise these to error levels: you really shouldn't be doing anything + // complicated in an in-proxy completion function! + LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what()); + } catch (...) { + LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command"); + } + clear_job = true; } else if (status == detail::BatchStatus::done) { clear_job = true; }