diff --git a/lokimq/batch.h b/lokimq/batch.h index 5bb65f9..2faff6e 100644 --- a/lokimq/batch.h +++ b/lokimq/batch.h @@ -185,8 +185,8 @@ public: /// invocation of the jobs. /// /// \param job the callback - /// \param thread an optional TaggedThread on which this job must run - void add_job(std::function job, const TaggedThread* thread = nullptr) { + /// \param thread an optional TaggedThreadID indicating a thread in which this job must run + void add_job(std::function job, std::optional thread = std::nullopt) { check_not_started(); if (thread && thread->_id == -1) // There are some special case internal jobs where we allow this, but they use the @@ -208,7 +208,7 @@ public: /// block for any reason. This is only intended for the case where the completion job is so /// trivial that it will take less time than simply queuing the job to be executed by another /// thread. - void completion(CompletionFunc comp, const TaggedThread* thread = nullptr) { + void completion(CompletionFunc comp, std::optional thread = std::nullopt) { check_not_started(); if (complete) throw std::logic_error("Completion function can only be set once"); diff --git a/lokimq/jobs.cpp b/lokimq/jobs.cpp index 6b16dcd..72feca7 100644 --- a/lokimq/jobs.cpp +++ b/lokimq/jobs.cpp @@ -26,7 +26,7 @@ void LokiMQ::proxy_batch(detail::Batch* batch) { proxy_skip_one_poll = true; } -void LokiMQ::job(std::function f, const TaggedThread* thread) { +void LokiMQ::job(std::function f, std::optional thread) { if (thread && thread->_id == -1) throw std::logic_error{"job() cannot be used to queue an in-proxy job"}; auto* b = new Batch; @@ -107,7 +107,7 @@ void LokiMQ::_queue_timer_job(int timer_id) { auto it = timer_jobs.find(timer_id); if (it != timer_jobs.end()) it->second.running = false; - }, &LokiMQ::run_in_proxy); + }, LokiMQ::run_in_proxy); } batches.insert(b); LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); @@ -118,7 +118,7 @@ void LokiMQ::_queue_timer_job(int timer_id) { queue.emplace(static_cast(b), 0); } -void LokiMQ::add_timer(std::function job, std::chrono::milliseconds interval, bool squelch, const TaggedThread* thread) { +void LokiMQ::add_timer(std::function job, std::chrono::milliseconds interval, bool squelch, std::optional thread) { int th_id = thread ? thread->_id : 0; if (proxy_thread.joinable()) { detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{ @@ -133,7 +133,7 @@ void LokiMQ::add_timer(std::function job, std::chrono::milliseconds inte void LokiMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); } -TaggedThread LokiMQ::add_tagged_thread(std::string name, std::function init, std::function start) { +TaggedThreadID LokiMQ::add_tagged_thread(std::string name, std::function start) { if (proxy_thread.joinable()) throw std::logic_error{"Cannot add tagged threads after calling `start()`"}; @@ -151,7 +151,7 @@ TaggedThread LokiMQ::add_tagged_thread(std::string name, std::function i return worker_thread(id, name, std::move(start)); }}; - return {std::move(name), static_cast(run.worker_id)}; + return TaggedThreadID{static_cast(run.worker_id)}; } } diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index a88e035..f4299d8 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -86,13 +86,12 @@ static constexpr size_t MAX_COMMAND_LENGTH = 200; class CatHelper; -/// Handle for a tagged thread constructed by add_tagged_thread(...). Not directly constructible, but -/// is safe to copy. -struct TaggedThread { - const std::string name; +/// Opaque handle for a tagged thread constructed by add_tagged_thread(...). Not directly +/// constructible, but is safe (and cheap) to copy. +struct TaggedThreadID { private: const int _id; - TaggedThread(std::string name, int id) : name{std::move(name)}, _id{id} {} + explicit constexpr TaggedThreadID(int id) : _id{id} {} friend class LokiMQ; template friend class Batch; }; @@ -264,9 +263,9 @@ public: */ int STARTUP_UMASK = -1; - /// A special TaggedThread value that always refers to the proxy thread; the main use of this is + /// A special TaggedThreadID value that always refers to the proxy thread; the main use of this is /// to direct very simple batch completion jobs to be executed directly in the proxy thread. - inline static const TaggedThread run_in_proxy{"_proxy", -1}; + inline static constexpr TaggedThreadID run_in_proxy{-1}; /// Writes a message to the logging system; intended mostly for internal use. template @@ -836,10 +835,10 @@ public: * \param start - similar to init, but this is called immediately *after* the LokiMQ object has * started up and so can use LokiMQ object functionality. * - * \returns a TaggedThread object that can be passed to job(), batch(), or add_timer() to direct - * the task to the tagged thread. + * \returns a TaggedThreadID object that can be passed to job(), batch(), or add_timer() to + * direct the task to the tagged thread. */ - TaggedThread add_tagged_thread(std::string name, std::function init = nullptr, std::function start = nullptr); + TaggedThreadID add_tagged_thread(std::string name, std::function init = nullptr, std::function start = nullptr); /** * Sets the number of worker threads reserved for batch jobs. If not explicitly called then @@ -1132,7 +1131,7 @@ public: * \param thread an optional tagged thread in which this job should run. You may *not* pass the * proxy thread here. */ - void job(std::function f, const TaggedThread *thread = nullptr); + void job(std::function f, std::optional = std::nullopt); /** * Adds a timer that gets scheduled periodically in the job queue. Normally jobs are not @@ -1143,7 +1142,7 @@ public: * * \param thread specifies a thread (added with add_tagged_thread()) on which this timer must run. */ - void add_timer(std::function job, std::chrono::milliseconds interval, bool squelch = true, const TaggedThread* thread = nullptr); + void add_timer(std::function job, std::chrono::milliseconds interval, bool squelch = true, std::optional = std::nullopt); }; /// Helper class that slightly simplifies adding commands to a category. diff --git a/tests/test_tagged_threads.cpp b/tests/test_tagged_threads.cpp index 13e1763..f52691c 100644 --- a/tests/test_tagged_threads.cpp +++ b/tests/test_tagged_threads.cpp @@ -48,7 +48,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_abc); + lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -56,7 +56,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_def); + lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_def); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -74,7 +74,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { std::this_thread::sleep_for(50ms); done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_abc); + lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -86,9 +86,9 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { // We can queue up a bunch of jobs which should all happen in order, and all on the abc thread. std::vector v; for (int i = 0; i < 100; i++) { - lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, &t_abc); + lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, t_abc); } - lmq.job([&] { done = true; }, &t_abc); + lmq.job([&] { done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -125,7 +125,7 @@ TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]" for (auto& r : result) sum += r.get(); result_sum = std::this_thread::get_id() == id_abc ? sum : -sum; - }, &t_abc); + }, t_abc); lmq.batch(std::move(batch)); wait_for([&] { return result_sum.load() != -1; }); { @@ -148,7 +148,7 @@ TEST_CASE("timer job completion on tagged threads", "[tagged][timer]") { std::atomic ticks = 0; std::atomic abc_ticks = 0; lmq.add_timer([&] { ticks++; }, 10ms); - lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, &t_abc); + lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, t_abc); wait_for([&] { return ticks.load() > 2 && abc_ticks > 2; }); {