From 8caab97355d5d56f32c0c8e2dd1aed487af5e549 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 5 Jun 2020 14:04:32 -0300 Subject: [PATCH] Rename TaggedThread to TaggedThreadID, drop .name attribute This renames the class to make it clearer what it does, and drops the .name attribute from it so that it can cheaply be passed around. This then means it can be cheaply passed by value (using std::optionals) rather than by pointer when specifying a thread. --- lokimq/batch.h | 6 +++--- lokimq/jobs.cpp | 10 +++++----- lokimq/lokimq.h | 23 +++++++++++------------ tests/test_tagged_threads.cpp | 14 +++++++------- 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/lokimq/batch.h b/lokimq/batch.h index 5bb65f9..2faff6e 100644 --- a/lokimq/batch.h +++ b/lokimq/batch.h @@ -185,8 +185,8 @@ public: /// invocation of the jobs. /// /// \param job the callback - /// \param thread an optional TaggedThread on which this job must run - void add_job(std::function job, const TaggedThread* thread = nullptr) { + /// \param thread an optional TaggedThreadID indicating a thread in which this job must run + void add_job(std::function job, std::optional thread = std::nullopt) { check_not_started(); if (thread && thread->_id == -1) // There are some special case internal jobs where we allow this, but they use the @@ -208,7 +208,7 @@ public: /// block for any reason. This is only intended for the case where the completion job is so /// trivial that it will take less time than simply queuing the job to be executed by another /// thread. - void completion(CompletionFunc comp, const TaggedThread* thread = nullptr) { + void completion(CompletionFunc comp, std::optional thread = std::nullopt) { check_not_started(); if (complete) throw std::logic_error("Completion function can only be set once"); diff --git a/lokimq/jobs.cpp b/lokimq/jobs.cpp index 6b16dcd..72feca7 100644 --- a/lokimq/jobs.cpp +++ b/lokimq/jobs.cpp @@ -26,7 +26,7 @@ void LokiMQ::proxy_batch(detail::Batch* batch) { proxy_skip_one_poll = true; } -void LokiMQ::job(std::function f, const TaggedThread* thread) { +void LokiMQ::job(std::function f, std::optional thread) { if (thread && thread->_id == -1) throw std::logic_error{"job() cannot be used to queue an in-proxy job"}; auto* b = new Batch; @@ -107,7 +107,7 @@ void LokiMQ::_queue_timer_job(int timer_id) { auto it = timer_jobs.find(timer_id); if (it != timer_jobs.end()) it->second.running = false; - }, &LokiMQ::run_in_proxy); + }, LokiMQ::run_in_proxy); } batches.insert(b); LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); @@ -118,7 +118,7 @@ void LokiMQ::_queue_timer_job(int timer_id) { queue.emplace(static_cast(b), 0); } -void LokiMQ::add_timer(std::function job, std::chrono::milliseconds interval, bool squelch, const TaggedThread* thread) { +void LokiMQ::add_timer(std::function job, std::chrono::milliseconds interval, bool squelch, std::optional thread) { int th_id = thread ? thread->_id : 0; if (proxy_thread.joinable()) { detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{ @@ -133,7 +133,7 @@ void LokiMQ::add_timer(std::function job, std::chrono::milliseconds inte void LokiMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); } -TaggedThread LokiMQ::add_tagged_thread(std::string name, std::function init, std::function start) { +TaggedThreadID LokiMQ::add_tagged_thread(std::string name, std::function start) { if (proxy_thread.joinable()) throw std::logic_error{"Cannot add tagged threads after calling `start()`"}; @@ -151,7 +151,7 @@ TaggedThread LokiMQ::add_tagged_thread(std::string name, std::function i return worker_thread(id, name, std::move(start)); }}; - return {std::move(name), static_cast(run.worker_id)}; + return TaggedThreadID{static_cast(run.worker_id)}; } } diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index a88e035..f4299d8 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -86,13 +86,12 @@ static constexpr size_t MAX_COMMAND_LENGTH = 200; class CatHelper; -/// Handle for a tagged thread constructed by add_tagged_thread(...). Not directly constructible, but -/// is safe to copy. -struct TaggedThread { - const std::string name; +/// Opaque handle for a tagged thread constructed by add_tagged_thread(...). Not directly +/// constructible, but is safe (and cheap) to copy. +struct TaggedThreadID { private: const int _id; - TaggedThread(std::string name, int id) : name{std::move(name)}, _id{id} {} + explicit constexpr TaggedThreadID(int id) : _id{id} {} friend class LokiMQ; template friend class Batch; }; @@ -264,9 +263,9 @@ public: */ int STARTUP_UMASK = -1; - /// A special TaggedThread value that always refers to the proxy thread; the main use of this is + /// A special TaggedThreadID value that always refers to the proxy thread; the main use of this is /// to direct very simple batch completion jobs to be executed directly in the proxy thread. - inline static const TaggedThread run_in_proxy{"_proxy", -1}; + inline static constexpr TaggedThreadID run_in_proxy{-1}; /// Writes a message to the logging system; intended mostly for internal use. template @@ -836,10 +835,10 @@ public: * \param start - similar to init, but this is called immediately *after* the LokiMQ object has * started up and so can use LokiMQ object functionality. * - * \returns a TaggedThread object that can be passed to job(), batch(), or add_timer() to direct - * the task to the tagged thread. + * \returns a TaggedThreadID object that can be passed to job(), batch(), or add_timer() to + * direct the task to the tagged thread. */ - TaggedThread add_tagged_thread(std::string name, std::function init = nullptr, std::function start = nullptr); + TaggedThreadID add_tagged_thread(std::string name, std::function init = nullptr, std::function start = nullptr); /** * Sets the number of worker threads reserved for batch jobs. If not explicitly called then @@ -1132,7 +1131,7 @@ public: * \param thread an optional tagged thread in which this job should run. You may *not* pass the * proxy thread here. */ - void job(std::function f, const TaggedThread *thread = nullptr); + void job(std::function f, std::optional = std::nullopt); /** * Adds a timer that gets scheduled periodically in the job queue. Normally jobs are not @@ -1143,7 +1142,7 @@ public: * * \param thread specifies a thread (added with add_tagged_thread()) on which this timer must run. */ - void add_timer(std::function job, std::chrono::milliseconds interval, bool squelch = true, const TaggedThread* thread = nullptr); + void add_timer(std::function job, std::chrono::milliseconds interval, bool squelch = true, std::optional = std::nullopt); }; /// Helper class that slightly simplifies adding commands to a category. diff --git a/tests/test_tagged_threads.cpp b/tests/test_tagged_threads.cpp index 13e1763..f52691c 100644 --- a/tests/test_tagged_threads.cpp +++ b/tests/test_tagged_threads.cpp @@ -48,7 +48,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_abc); + lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -56,7 +56,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_def); + lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_def); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -74,7 +74,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { std::this_thread::sleep_for(50ms); done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_abc); + lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -86,9 +86,9 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { // We can queue up a bunch of jobs which should all happen in order, and all on the abc thread. std::vector v; for (int i = 0; i < 100; i++) { - lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, &t_abc); + lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, t_abc); } - lmq.job([&] { done = true; }, &t_abc); + lmq.job([&] { done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -125,7 +125,7 @@ TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]" for (auto& r : result) sum += r.get(); result_sum = std::this_thread::get_id() == id_abc ? sum : -sum; - }, &t_abc); + }, t_abc); lmq.batch(std::move(batch)); wait_for([&] { return result_sum.load() != -1; }); { @@ -148,7 +148,7 @@ TEST_CASE("timer job completion on tagged threads", "[tagged][timer]") { std::atomic ticks = 0; std::atomic abc_ticks = 0; lmq.add_timer([&] { ticks++; }, 10ms); - lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, &t_abc); + lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, t_abc); wait_for([&] { return ticks.load() > 2 && abc_ticks > 2; }); {