mirror of https://github.com/oxen-io/oxen-mq.git
Add simpler Job subclass of Batch for simple jobs
This adds a much simpler `Job` implementation of `Batch` that is used for simple no-return, no-completion jobs (as are initiated via `omq.job(...)`). This reduces the overhead involved in constructing/destroying the Batch instance for these common jobs.
This commit is contained in:
parent
5c7f6504d2
commit
3a51713396
|
@ -266,6 +266,52 @@ private:
|
|||
}
|
||||
};
|
||||
|
||||
// Similar to Batch<void>, but doesn't support a completion function and only handles a single task.
|
||||
class Job final : private detail::Batch {
|
||||
friend class OxenMQ;
|
||||
public:
|
||||
/// Constructs the Job to run a single task. Takes any callable invokable with no arguments and
|
||||
/// having no return value. The task will be scheduled and run when the next worker thread is
|
||||
/// available. Any exceptions thrown by the job will be caught and squelched (the exception
|
||||
/// terminates/completes the job).
|
||||
|
||||
explicit Job(std::function<void()> f, std::optional<TaggedThreadID> thread = std::nullopt)
|
||||
: Job{std::move(f), thread ? thread->_id : 0}
|
||||
{
|
||||
if (thread && thread->_id == -1)
|
||||
// There are some special case internal jobs where we allow this, but they use the
|
||||
// private ctor below that doesn't have this check.
|
||||
throw std::logic_error{"Cannot add a proxy thread job -- this makes no sense"};
|
||||
}
|
||||
|
||||
// movable
|
||||
Job(Job&&) = default;
|
||||
Job &operator=(Job&&) = default;
|
||||
|
||||
// non-copyable
|
||||
Job(const Job&) = delete;
|
||||
Job &operator=(const Job&) = delete;
|
||||
|
||||
private:
|
||||
explicit Job(std::function<void()> f, int thread_id)
|
||||
: job{std::move(f), thread_id} {}
|
||||
|
||||
std::pair<std::function<void()>, int> job;
|
||||
bool done = false;
|
||||
|
||||
std::pair<size_t, bool> size() const override { return {1, job.second != 0}; }
|
||||
std::vector<int> threads() const override { return {job.second}; }
|
||||
|
||||
void run_job(const int /*i*/) override {
|
||||
try { job.first(); }
|
||||
catch (...) {}
|
||||
}
|
||||
|
||||
detail::BatchStatus job_finished() override { return {detail::BatchState::done, 0}; }
|
||||
|
||||
void job_completion() override {} // Never called because we return ::done (not ::complete) above.
|
||||
|
||||
};
|
||||
|
||||
template <typename R>
|
||||
void OxenMQ::batch(Batch<R>&& batch) {
|
||||
|
|
|
@ -29,17 +29,15 @@ void OxenMQ::proxy_batch(detail::Batch* batch) {
|
|||
void OxenMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread) {
|
||||
if (thread && thread->_id == -1)
|
||||
throw std::logic_error{"job() cannot be used to queue an in-proxy job"};
|
||||
auto* b = new Batch<void>;
|
||||
b->add_job(std::move(f), thread);
|
||||
auto* baseptr = static_cast<detail::Batch*>(b);
|
||||
auto* j = new Job(std::move(f), thread);
|
||||
auto* baseptr = static_cast<detail::Batch*>(j);
|
||||
detail::send_control(get_control_socket(), "BATCH", oxenc::bt_serialize(reinterpret_cast<uintptr_t>(baseptr)));
|
||||
}
|
||||
|
||||
void OxenMQ::proxy_schedule_reply_job(std::function<void()> f) {
|
||||
auto* b = new Batch<void>;
|
||||
b->add_job(std::move(f));
|
||||
batches.insert(b);
|
||||
reply_jobs.emplace(static_cast<detail::Batch*>(b), 0);
|
||||
auto* j = new Job(std::move(f));
|
||||
reply_jobs.emplace_back(static_cast<detail::Batch*>(j), 0);
|
||||
batches.insert(j);
|
||||
proxy_skip_one_poll = true;
|
||||
}
|
||||
|
||||
|
@ -98,11 +96,12 @@ void OxenMQ::_queue_timer_job(int timer_id) {
|
|||
return;
|
||||
}
|
||||
|
||||
auto* b = new Batch<void>;
|
||||
b->add_job(func, thread);
|
||||
detail::Batch* b;
|
||||
if (squelch) {
|
||||
auto* bv = new Batch<void>;
|
||||
bv->add_job(func, thread);
|
||||
running = true;
|
||||
b->completion([this,timer_id](auto results) {
|
||||
bv->completion([this,timer_id](auto results) {
|
||||
try { results[0].get(); }
|
||||
catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
|
||||
catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
|
||||
|
@ -110,6 +109,9 @@ void OxenMQ::_queue_timer_job(int timer_id) {
|
|||
if (it != timer_jobs.end())
|
||||
it->second.running = false;
|
||||
}, OxenMQ::run_in_proxy);
|
||||
b = bv;
|
||||
} else {
|
||||
b = new Job(func, thread);
|
||||
}
|
||||
batches.insert(b);
|
||||
OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
|
||||
|
|
|
@ -106,6 +106,7 @@ private:
|
|||
explicit constexpr TaggedThreadID(int id) : _id{id} {}
|
||||
friend class OxenMQ;
|
||||
template <typename R> friend class Batch;
|
||||
friend class Job;
|
||||
};
|
||||
|
||||
/// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only
|
||||
|
|
Loading…
Reference in New Issue