Tagged threads for jobs, batches, and timers

This adds to ability to have lokimq manage specific threads to which
jobs (individual, batch jobs, batch completions, or timers) can be
directed to.  This allows dedicating a thread to some slow or
thread-unsafe action where you can dump jobs to the tagged thread as
a method of lockless job queuing.
This commit is contained in:
Jason Rhinelander 2020-06-04 23:32:28 -03:00
parent 6356421488
commit 29380922bf
8 changed files with 508 additions and 126 deletions

View file

@ -36,18 +36,25 @@ namespace lokimq {
namespace detail {
enum class BatchStatus {
enum class BatchState {
running, // there are still jobs to run (or running)
complete, // the batch is complete but still has a completion job to call
complete_proxy, // same as `complete`, but the completion job should be invoked immediately in the proxy thread (be very careful)
done // the batch is complete and has no completion function
};
struct BatchStatus {
BatchState state;
int thread;
};
// Virtual base class for Batch<R>
class Batch {
public:
// Returns the number of jobs in this batch
virtual size_t size() const = 0;
// Returns the number of jobs in this batch and whether any of them are thread-specific
virtual std::pair<size_t, bool> size() const = 0;
// Returns a vector of exactly the same length of size().first containing the tagged thread ids
// of the batch jobs or 0 for general jobs.
virtual std::vector<int> threads() const = 0;
// Called in a worker thread to run the job
virtual void run_job(int i) = 0;
// Called in the main proxy thread when the worker returns from finishing a job. The return
@ -151,12 +158,13 @@ public:
Batch &operator=(const Batch&) = delete;
private:
std::vector<std::function<R()>> jobs;
std::vector<std::pair<std::function<R()>, int>> jobs;
std::vector<job_result<R>> results;
CompletionFunc complete;
std::size_t jobs_outstanding = 0;
bool complete_in_proxy = false;
int complete_in_thread = 0;
bool started = false;
bool tagged_thread_jobs = false;
void check_not_started() {
if (started)
@ -175,39 +183,61 @@ public:
/// available. The called function may throw exceptions (which will be propagated to the
/// completion function through the job_result values). There is no guarantee on the order of
/// invocation of the jobs.
void add_job(std::function<R()> job) {
///
/// \param job the callback
/// \param thread an optional TaggedThread on which this job must run
void add_job(std::function<R()> job, const TaggedThread* thread = nullptr) {
check_not_started();
jobs.emplace_back(std::move(job));
results.emplace_back();
jobs_outstanding++;
if (thread && thread->_id == -1)
// There are some special case internal jobs where we allow this, but they use the
// private method below that doesn't have this check.
throw std::logic_error{"Cannot add a proxy thread batch job -- this makes no sense"};
add_job(std::move(job), thread ? thread->_id : 0);
}
/// Sets the completion function to invoke after all jobs have finished. If this is not set
/// then jobs simply run and results are discarded.
void completion(CompletionFunc comp) {
///
/// \param comp - function to call when all jobs have finished
/// \param thread - optional tagged thread in which to schedule the completion job. If not
/// provided then the completion job is scheduled in the pool of batch job threads.
///
/// `thread` can be provided the value &LokiMQ::run_in_proxy to invoke the completion function
/// *IN THE PROXY THREAD* itself after all jobs have finished. Be very, very careful: this
/// should be a nearly trivial job that does not require any substantial CPU time and does not
/// block for any reason. This is only intended for the case where the completion job is so
/// trivial that it will take less time than simply queuing the job to be executed by another
/// thread.
void completion(CompletionFunc comp, const TaggedThread* thread = nullptr) {
check_not_started();
if (complete)
throw std::logic_error("Completion function can only be set once");
complete = std::move(comp);
}
/// Sets a completion function to invoke *IN THE PROXY THREAD* after all jobs have finished. Be
/// very, very careful: this should not be a job that takes any significant amount of CPU time
/// or can block for any reason (NO MUTEXES).
void completion_proxy(CompletionFunc comp) {
check_not_started();
if (complete)
throw std::logic_error("Completion function can only be set once");
complete = std::move(comp);
complete_in_proxy = true;
complete_in_thread = thread ? thread->_id : 0;
}
private:
std::size_t size() const override {
return jobs.size();
void add_job(std::function<R()> job, int thread_id) {
jobs.emplace_back(std::move(job), thread_id);
results.emplace_back();
jobs_outstanding++;
if (thread_id != 0)
tagged_thread_jobs = true;
}
std::pair<std::size_t, bool> size() const override {
return {jobs.size(), tagged_thread_jobs};
}
std::vector<int> threads() const override {
std::vector<int> t;
t.reserve(jobs.size());
for (auto& j : jobs)
t.push_back(j.second);
return t;
};
template <typename S = R>
void set_value(job_result<S>& r, std::function<S()>& f) { r.set_value(f()); }
void set_value(job_result<void>&, std::function<void()>& f) { f(); }
@ -216,7 +246,7 @@ private:
// called by worker thread
auto& r = results[i];
try {
set_value(r, jobs[i]);
set_value(r, jobs[i].first);
} catch (...) {
r.set_exception(std::current_exception());
}
@ -225,12 +255,10 @@ private:
detail::BatchStatus job_finished() override {
--jobs_outstanding;
if (jobs_outstanding)
return detail::BatchStatus::running;
return {detail::BatchState::running, 0};
if (complete)
return complete_in_proxy
? detail::BatchStatus::complete_proxy
: detail::BatchStatus::complete;
return detail::BatchStatus::done;
return {detail::BatchState::complete, complete_in_thread};
return {detail::BatchState::done, 0};
}
void job_completion() override {
@ -241,7 +269,7 @@ private:
template <typename R>
void LokiMQ::batch(Batch<R>&& batch) {
if (batch.size() == 0)
if (batch.size().first == 0)
throw std::logic_error("Cannot batch a a job batch with 0 jobs");
// Need to send this over to the proxy thread via the base class pointer. It assumes ownership.
auto* baseptr = static_cast<detail::Batch*>(new Batch<R>(std::move(batch)));

View file

@ -6,15 +6,31 @@ namespace lokimq {
void LokiMQ::proxy_batch(detail::Batch* batch) {
batches.insert(batch);
const int jobs = batch->size();
for (int i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
const auto [jobs, tagged_threads] = batch->size();
LMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
if (!tagged_threads) {
for (size_t i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
} else {
// Some (or all) jobs have a specific thread target so queue any such jobs in the tagged
// worker queue.
auto threads = batch->threads();
for (size_t i = 0; i < jobs; i++) {
auto& jobs = threads[i] > 0
? std::get<std::queue<batch_job>>(tagged_workers[threads[i] - 1])
: batch_jobs;
jobs.emplace(batch, i);
}
}
proxy_skip_one_poll = true;
}
void LokiMQ::job(std::function<void()> f) {
void LokiMQ::job(std::function<void()> f, const TaggedThread* thread) {
if (thread && thread->_id == -1)
throw std::logic_error{"job() cannot be used to queue an in-proxy job"};
auto* b = new Batch<void>;
b->add_job(std::move(f));
b->add_job(std::move(f), thread);
auto* baseptr = static_cast<detail::Batch*>(b);
detail::send_control(get_control_socket(), "BATCH", bt_serialize(reinterpret_cast<uintptr_t>(baseptr)));
}
@ -38,7 +54,7 @@ void LokiMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserve
// Called either within the proxy thread, or before the proxy thread has been created; actually adds
// the timer. If the timer object hasn't been set up yet it gets set up here.
void LokiMQ::proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch) {
void LokiMQ::proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, int thread) {
if (!timers)
timers.reset(zmq_timers_new());
@ -48,16 +64,17 @@ void LokiMQ::proxy_timer(std::function<void()> job, std::chrono::milliseconds in
this);
if (timer_id == -1)
throw zmq::error_t{};
timer_jobs[timer_id] = std::make_tuple(std::move(job), squelch, false);
timer_jobs[timer_id] = { std::move(job), squelch, false, thread };
}
void LokiMQ::proxy_timer(bt_list_consumer timer_data) {
std::unique_ptr<std::function<void()>> func{reinterpret_cast<std::function<void()>*>(timer_data.consume_integer<uintptr_t>())};
auto interval = std::chrono::milliseconds{timer_data.consume_integer<uint64_t>()};
auto squelch = timer_data.consume_integer<bool>();
auto thread = timer_data.consume_integer<int>();
if (!timer_data.is_finished())
throw std::runtime_error("Internal error: proxied timer request contains unexpected data");
proxy_timer(std::move(*func), interval, squelch);
proxy_timer(std::move(*func), interval, squelch, thread);
}
void LokiMQ::_queue_timer_job(int timer_id) {
@ -66,43 +83,75 @@ void LokiMQ::_queue_timer_job(int timer_id) {
LMQ_LOG(warn, "Could not find timer job ", timer_id);
return;
}
auto& timer = it->second;
auto& squelch = std::get<1>(timer);
auto& running = std::get<2>(timer);
auto& [func, squelch, running, thread] = it->second;
if (squelch && running) {
LMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running");
return;
}
if (thread == -1) { // Run directly in proxy thread
try { func(); }
catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
return;
}
auto* b = new Batch<void>;
b->add_job(std::get<0>(timer));
b->add_job(func, thread);
if (squelch) {
running = true;
b->completion_proxy([this,timer_id](auto results) {
b->completion([this,timer_id](auto results) {
try { results[0].get(); }
catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
auto it = timer_jobs.find(timer_id);
if (it != timer_jobs.end())
std::get<2>(it->second)/*running*/ = false;
});
it->second.running = false;
}, &LokiMQ::run_in_proxy);
}
batches.insert(b);
batch_jobs.emplace(static_cast<detail::Batch*>(b), 0);
assert(b->size() == 1);
LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
assert(b->size() == std::make_pair(size_t{1}, thread > 0));
auto& queue = thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1])
: batch_jobs;
queue.emplace(static_cast<detail::Batch*>(b), 0);
}
void LokiMQ::add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch) {
void LokiMQ::add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, const TaggedThread* thread) {
int th_id = thread ? thread->_id : 0;
if (proxy_thread.joinable()) {
detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{
detail::serialize_object(std::move(job)),
interval.count(),
squelch}}));
squelch,
th_id}}));
} else {
proxy_timer(std::move(job), interval, squelch);
proxy_timer(std::move(job), interval, squelch, th_id);
}
}
void LokiMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); }
TaggedThread LokiMQ::add_tagged_thread(std::string name, std::function<void()> init, std::function<void()> start) {
if (proxy_thread.joinable())
throw std::logic_error{"Cannot add tagged threads after calling `start()`"};
if (name == "_proxy"sv || name.empty() || name.find('\0') != std::string::npos)
throw std::logic_error{"Invalid tagged thread name `" + name + "'"};
auto& [run, busy, queue] = tagged_workers.emplace_back();
busy = false;
run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs)
run.worker_routing_id = "t" + std::to_string(run.worker_id);
LMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
run.worker_thread = std::thread{[this, id=run.worker_id, name, init=std::move(init), start=std::move(start)] {
if (init) init();
return worker_thread(id, name, std::move(start));
}};
return {std::move(name), static_cast<int>(run.worker_id)};
}
}

View file

@ -348,6 +348,7 @@ LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, C
std::vector<zmq::message_t> data_parts_, const std::pair<CommandCallback, bool>* callback_) {
is_batch_job = false;
is_reply_job = false;
is_tagged_thread_job = false;
cat = cat_;
command = std::move(command_);
conn = std::move(conn_);
@ -363,9 +364,10 @@ LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) {
std::move(pending.remote), std::move(pending.data_parts), pending.callback);
}
LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) {
LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job, int tagged_thread) {
is_batch_job = true;
is_reply_job = reply_job;
is_tagged_thread_job = tagged_thread > 0;
batch_jobno = bj.second;
batch = bj.first;
return *this;

View file

@ -86,6 +86,17 @@ static constexpr size_t MAX_COMMAND_LENGTH = 200;
class CatHelper;
/// Handle for a tagged thread constructed by add_tagged_thread(...). Not directly constructible, but
/// is safe to copy.
struct TaggedThread {
const std::string name;
private:
const int _id;
TaggedThread(std::string name, int id) : name{std::move(name)}, _id{id} {}
friend class LokiMQ;
template <typename R> friend class Batch;
};
/**
* Class that handles LokiMQ listeners, connections, proxying, and workers. An application
* typically has just one instance of this class.
@ -253,6 +264,10 @@ public:
*/
int STARTUP_UMASK = -1;
/// A special TaggedThread value that always refers to the proxy thread; the main use of this is
/// to direct very simple batch completion jobs to be executed directly in the proxy thread.
inline static const TaggedThread run_in_proxy{"_proxy", -1};
private:
/// The lookup function that tells us where to connect to a peer, or empty if not found.
@ -382,7 +397,8 @@ private:
/// Timers. TODO: once cppzmq adds an interface around the zmq C timers API then switch to it.
struct TimersDeleter { void operator()(void* timers); };
std::unordered_map<int, std::tuple<std::function<void()>, bool, bool>> timer_jobs; // id => {func, squelch, running}
struct timer_data { std::function<void()> function; bool squelch; bool running; int thread; };
std::unordered_map<int, timer_data> timer_jobs;
std::unique_ptr<void, TimersDeleter> timers;
public:
// This needs to be public because we have to be able to call it from a plain C function.
@ -408,8 +424,8 @@ private:
/// Number of active workers
int active_workers() const { return workers.size() - idle_workers.size(); }
/// Worker thread loop
void worker_thread(unsigned int index);
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
/// If set, skip polling for one proxy loop iteration (set when we know we have something
/// processible without having to shove it onto a socket, such as scheduling an internal job).
@ -506,7 +522,8 @@ private:
/// Currently active batch/reply jobs; this is the container that owns the Batch instances
std::unordered_set<detail::Batch*> batches;
/// Individual batch jobs waiting to run
/// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the
/// completion job
using batch_job = std::pair<detail::Batch*, int>;
std::queue<batch_job> batch_jobs, reply_jobs;
int batch_jobs_active = 0;
@ -526,7 +543,7 @@ private:
void proxy_timer(bt_list_consumer timer_data);
/// Same, but deserialized
void proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch);
void proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, int thread);
/// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this does non-blocking
/// processing of any waiting authentication requests for new incoming connections.
@ -616,6 +633,7 @@ private:
struct run_info {
bool is_batch_job = false;
bool is_reply_job = false;
bool is_tagged_thread_job = false;
// If is_batch_job is false then these will be set appropriate (if is_batch_job is true then
// these shouldn't be accessed and likely contain stale data).
@ -637,8 +655,8 @@ private:
// These belong to the proxy thread and must not be accessed by a worker:
std::thread worker_thread;
size_t worker_id; // The index in `workers`
std::string worker_routing_id; // "w123" where 123 == worker_id
size_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
std::string worker_routing_id; // "w123" where 123 == worker_id; "n123" for tagged threads.
/// Loads the run info with an incoming command
run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote,
@ -647,7 +665,7 @@ private:
/// Loads the run info with a stored pending command
run_info& load(pending_command&& pending);
/// Loads the run info with a batch job
run_info& load(batch_job&& bj, bool reply_job = false);
run_info& load(batch_job&& bj, bool reply_job = false, int tagged_thread = 0);
};
/// Data passed to workers for the RUN command. The proxy thread sets elements in this before
/// sending RUN to a worker then the worker uses it to get call info, and only allocates it
@ -655,6 +673,11 @@ private:
/// change it.
std::vector<run_info> workers;
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
/// is whether the worker is currently busy (true) or available (false).
std::vector<std::tuple<run_info, bool, std::queue<batch_job>>> tagged_workers;
public:
/**
* LokiMQ constructor. This constructs the object but does not start it; you will typically
@ -793,6 +816,31 @@ public:
*/
void add_command_alias(std::string from, std::string to);
/** Creates a "tagged thread" and starts it immediately. A tagged thread is one that batches,
* jobs, and timer jobs can be sent to by specifically, typically to perform coordination of
* some thread-unsafe work.
*
* Tagged threads will *only* process jobs sent specifically to them; they do not participate in
* the thread pool used for regular jobs. Each tagged thread also has its own job queue
* completely separate from any other jobs.
*
* Tagged threads must be created *before* `start()` is called. The name will be used to set the
* thread name in the process table (if supported on the OS).
*
* \param name - the name of the thread; will be used in log messages and (if supported by the
* OS) as the system thread name.
* \param init - an optional callback to invoke from the thread immediately after creating it.
* This can be used to perform initialization, however should not depend on LokiMQ having
* started.
*
* \param start - similar to init, but this is called immediately *after* the LokiMQ object has
* started up and so can use LokiMQ object functionality.
*
* \returns a TaggedThread object that can be passed to job(), batch(), or add_timer() to direct
* the task to the tagged thread.
*/
TaggedThread add_tagged_thread(std::string name, std::function<void()> init = nullptr, std::function<void()> start = nullptr);
/**
* Sets the number of worker threads reserved for batch jobs. If not explicitly called then
* this defaults to half the general worker threads configured (rounded up). This works exactly
@ -1079,8 +1127,12 @@ public:
/**
* Queues a single job to be executed with no return value. This is a shortcut for creating and
* submitting a single-job, no-completion-function batch job.
*
* \param f the callback to invoke
* \param thread an optional tagged thread in which this job should run. You may *not* pass the
* proxy thread here.
*/
void job(std::function<void()> f);
void job(std::function<void()> f, const TaggedThread *thread = nullptr);
/**
* Adds a timer that gets scheduled periodically in the job queue. Normally jobs are not
@ -1088,8 +1140,10 @@ public:
* previously scheduled callback of the job has not yet completed. If you want to override this
* (so that, under heavy load or long jobs, there can be more than one of the same job scheduled
* or running at a time) then specify `squelch` as `false`.
*
* \param thread specifies a thread (added with add_tagged_thread()) on which this timer must run.
*/
void add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch = true);
void add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch = true, const TaggedThread* thread = nullptr);
};
/// Helper class that slightly simplifies adding commands to a category.

View file

@ -14,6 +14,7 @@ void LokiMQ::proxy_quit() {
LMQ_LOG(debug, "Received quit command, shutting down proxy thread");
assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); }));
assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); }));
command.setsockopt<int>(ZMQ_LINGER, 0);
command.close();
@ -289,6 +290,9 @@ void LokiMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
for (const auto &route : idle_workers)
route_control(workers_socket, workers[route].worker_routing_id, "QUIT");
idle_workers.clear();
for (auto& [run, busy, queue] : tagged_workers)
if (!busy)
route_control(workers_socket, run.worker_routing_id, "QUIT");
return;
}
}
@ -329,6 +333,7 @@ void LokiMQ::proxy_loop() {
LMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
LMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
LMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
LMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
}
workers.reserve(max_workers);
@ -392,12 +397,19 @@ void LokiMQ::proxy_loop() {
throw zmq::error_t{};
}
// Tell any tagged workers to go ahead with startup:
for (auto& w : tagged_workers) {
LMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
}
std::vector<zmq::message_t> parts;
while (true) {
std::chrono::milliseconds poll_timeout;
if (max_workers == 0) { // Will be 0 only if we are quitting
if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.worker_thread.joinable(); })) {
if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.worker_thread.joinable(); }) &&
std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto &w) { return std::get<0>(w).worker_thread.joinable(); })) {
// All the workers have finished, so we can finish shutting down
return proxy_quit();
}
@ -616,7 +628,19 @@ bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>
}
void LokiMQ::proxy_process_queue() {
// First up: process any batch jobs; since these are internal they are given higher priority.
if (max_workers == 0) // shutting down
return;
// First: send any tagged thread tasks to the tagged threads, if idle
for (auto& [run, busy, queue] : tagged_workers) {
if (!busy && !queue.empty()) {
busy = true;
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
queue.pop();
}
}
// Second: process any batch jobs; since these are internal they are given higher priority.
proxy_run_batch_jobs(batch_jobs, batch_jobs_reserved, batch_jobs_active, false);
// Next any reply batch jobs (which are a bit different from the above, since they are

View file

@ -5,27 +5,93 @@
namespace lokimq {
void LokiMQ::worker_thread(unsigned int index) {
std::string worker_id = "w" + std::to_string(index);
void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
[[maybe_unused]] std::string thread_name = tagged.value_or("lmq-" + routing_id);
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
pthread_setname_np(pthread_self(), ("lmq-" + worker_id).c_str());
if (thread_name.size() > 15) thread_name.resize(15);
pthread_setname_np(pthread_self(), thread_name.c_str());
#elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
pthread_set_name_np(pthread_self(), ("lmq-" + worker_id).c_str());
pthread_set_name_np(pthread_self(), thread_name.c_str());
#elif defined(__MACH__)
pthread_setname_np(("lmq-" + worker_id).c_str());
pthread_setname_np(thread_name.c_str());
#endif
zmq::socket_t sock{context, zmq::socket_type::dealer};
sock.setsockopt(ZMQ_ROUTING_ID, worker_id.data(), worker_id.size());
LMQ_LOG(debug, "New worker thread ", worker_id, " started");
sock.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size());
LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
sock.connect(SN_ADDR_WORKERS);
Message message{*this, 0, AuthLevel::none, ""s};
std::vector<zmq::message_t> parts;
run_info& run = workers[index]; // This contains our first job, and will be updated later with subsequent jobs
bool waiting_for_command;
if (tagged) {
// If we're a tagged worker then we got started up before LokiMQ started, so we need to wait
// for an all-clear signal from LokiMQ first, then we fire our `start` callback, then we can
// start waiting for commands in the main loop further down. (We also can't get the
// reference to our `tagged_workers` element until the main proxy threads is running).
waiting_for_command = true;
while (true) {
LMQ_TRACE("tagged worker ", worker_id, " waiting for startup signal");
recv_message_parts(sock, parts);
if (parts.size() != 1) {
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part startup msg");
continue;
}
auto command = view(parts[0]);
if (command == "START"sv) {
LMQ_LOG(debug, "Tagged worker ", worker_id, " received start signal, starting up");
if (start) start();
break;
} else if (command == "QUIT"sv) {
LMQ_LOG(debug, "Tagged worker ", worker_id, " shutting down");
detail::send_control(sock, "QUITTING");
sock.setsockopt<int>(ZMQ_LINGER, 1000);
sock.close();
return;
} else {
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
}
}
} else {
// Otherwise for a regular worker we can only be started by an active main proxy thread
// which will have preloaded our first job so we can start off right away.
waiting_for_command = false;
}
// This will always contains the current job, and is guaranteed to never be invalidated.
run_info& run = tagged ? std::get<run_info>(tagged_workers[index - 1]) : workers[index];
while (true) {
while (waiting_for_command) {
LMQ_TRACE("worker ", worker_id, " waiting for command");
parts.clear();
recv_message_parts(sock, parts);
if (parts.size() != 1) {
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part worker instruction");
continue;
}
auto command = view(parts[0]);
if (command == "RUN"sv) {
LMQ_LOG(debug, "worker ", worker_id, " running ", run.is_batch_job ? "batch"s : "command " + run.command);
waiting_for_command = false;
} else if (command == "QUIT"sv) {
LMQ_LOG(debug, "worker ", worker_id, " shutting down");
detail::send_control(sock, "QUITTING");
sock.setsockopt<int>(ZMQ_LINGER, 1000);
sock.close();
return;
} else {
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
}
}
try {
if (run.is_batch_job) {
if (run.batch_jobno >= 0) {
@ -72,32 +138,9 @@ void LokiMQ::worker_thread(unsigned int index) {
LMQ_LOG(warn, worker_id, " caught non-standard exception when processing command");
}
while (true) {
// Signal that we are ready for another job and wait for it. (We do this down here
// because our first job gets set up when the thread is started).
detail::send_control(sock, "RAN");
LMQ_TRACE("worker ", worker_id, " waiting for requests");
parts.clear();
recv_message_parts(sock, parts);
if (parts.size() != 1) {
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part worker instruction");
continue;
}
auto command = view(parts[0]);
if (command == "RUN") {
LMQ_LOG(debug, "worker ", worker_id, " running command ", run.command);
break; // proxy has set up a command for us, go back and run it.
} else if (command == "QUIT") {
LMQ_LOG(debug, "worker ", worker_id, " shutting down");
detail::send_control(sock, "QUITTING");
sock.setsockopt<int>(ZMQ_LINGER, 1000);
sock.close();
return;
} else {
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
}
}
// Tell the proxy thread that we are ready for another job
detail::send_control(sock, "RAN");
waiting_for_command = true;
}
}
@ -125,46 +168,66 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
}
auto route = view(parts[0]), cmd = view(parts[1]);
LMQ_TRACE("worker message from ", route);
assert(route.size() >= 2 && route[0] == 'w' && route[1] >= '0' && route[1] <= '9');
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w"
assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9');
bool tagged_worker = route[0] == 't';
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t")
unsigned int worker_id = detail::extract_unsigned(worker_id_str);
if (!worker_id_str.empty() /* didn't consume everything */ || worker_id >= workers.size()) {
if (!worker_id_str.empty() /* didn't consume everything */ ||
(tagged_worker
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
: worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1
)
) {
LMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
return;
}
auto& run = workers[worker_id];
auto& run = tagged_worker ? std::get<run_info>(tagged_workers[worker_id - 1]) : workers[worker_id];
LMQ_TRACE("received ", cmd, " command from ", route);
if (cmd == "RAN") {
LMQ_LOG(debug, "Worker ", route, " finished ", run.command);
if (cmd == "RAN"sv) {
LMQ_TRACE("Worker ", route, " finished ", run.is_batch_job ? "batch job" : run.command);
if (run.is_batch_job) {
auto& jobs = run.is_reply_job ? reply_jobs : batch_jobs;
auto& active = run.is_reply_job ? reply_jobs_active : batch_jobs_active;
assert(active > 0);
active--;
if (tagged_worker) {
std::get<bool>(tagged_workers[worker_id - 1]) = false;
} else {
auto& active = run.is_reply_job ? reply_jobs_active : batch_jobs_active;
assert(active > 0);
active--;
}
bool clear_job = false;
if (run.batch_jobno == -1) {
// Returned from the completion function
clear_job = true;
} else {
auto status = run.batch->job_finished();
if (status == detail::BatchStatus::complete) {
jobs.emplace(run.batch, -1);
} else if (status == detail::BatchStatus::complete_proxy) {
try {
run.batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD
} catch (const std::exception &e) {
// Raise these to error levels: the caller really shouldn't be doing
// anything non-trivial in an in-proxy completion function!
LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
} catch (...) {
LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
auto [state, thread] = run.batch->job_finished();
if (state == detail::BatchState::complete) {
if (thread == -1) { // run directly in proxy
LMQ_TRACE("Completion job running directly in proxy");
try {
run.batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD
} catch (const std::exception &e) {
// Raise these to error levels: the caller really shouldn't be doing
// anything non-trivial in an in-proxy completion function!
LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
} catch (...) {
LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
}
clear_job = true;
} else {
auto& jobs =
thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1]) // run in tagged thread
: run.is_reply_job
? reply_jobs
: batch_jobs;
jobs.emplace(run.batch, -1);
}
clear_job = true;
} else if (status == detail::BatchStatus::done) {
} else if (state == detail::BatchState::done) {
// No completion job
clear_job = true;
}
// else the job is still running
}
if (clear_job) {
@ -179,11 +242,11 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
if (max_workers == 0) { // Shutting down
LMQ_TRACE("Telling worker ", route, " to quit");
route_control(workers_socket, route, "QUIT");
} else {
} else if (!tagged_worker) {
idle_workers.push_back(worker_id);
}
} else if (cmd == "QUITTING") {
workers[worker_id].worker_thread.join();
} else if (cmd == "QUITTING"sv) {
run.worker_thread.join();
LMQ_LOG(debug, "Worker ", route, " exited normally");
} else {
LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'");
@ -192,7 +255,7 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
void LokiMQ::proxy_run_worker(run_info& run) {
if (!run.worker_thread.joinable())
run.worker_thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id};
run.worker_thread = std::thread{[this, id=run.worker_id] { worker_thread(id); }};
else
send_routed_message(workers_socket, run.worker_routing_id, "RUN");
}

View file

@ -11,6 +11,7 @@ set(LMQ_TEST_SRC
test_encoding.cpp
test_failures.cpp
test_requests.cpp
test_tagged_threads.cpp
)
add_executable(tests ${LMQ_TEST_SRC})

View file

@ -0,0 +1,161 @@
#include "lokimq/batch.h"
#include "common.h"
#include <future>
TEST_CASE("tagged thread init and start functions", "[tagged][init]") {
lokimq::LokiMQ lmq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(2);
lmq.set_batch_threads(2);
auto t_abc = lmq.add_tagged_thread("abc");
std::atomic<bool> init_called = false, initghi_called = false, start_called = false;
auto t_def = lmq.add_tagged_thread("def", [&] { init_called = true; });
auto t_ghi = lmq.add_tagged_thread("def", [&] { initghi_called = true; }, [&] { start_called = false; });
wait_for([&] { return init_called.load() && initghi_called.load(); });
{
auto lock = catch_lock();
REQUIRE_FALSE( start_called );
}
lmq.start();
wait_for([&] { return start_called.load(); });
{
auto lock = catch_lock();
REQUIRE_FALSE( start_called );
}
}
TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
lokimq::LokiMQ lmq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(2);
lmq.set_batch_threads(2);
std::thread::id id_abc, id_def;
auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
auto t_def = lmq.add_tagged_thread("def", [&] { id_def = std::this_thread::get_id(); });
lmq.start();
std::atomic<bool> done = false;
std::thread::id id;
lmq.job([&] { id = std::this_thread::get_id(); done = true; });
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
REQUIRE( id != id_abc );
REQUIRE( id != id_def );
}
done = false;
lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_abc);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
REQUIRE( id == id_abc );
}
done = false;
lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_def);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
REQUIRE( id == id_def );
}
std::atomic<bool> sleep = true;
auto sleeper = [&] { for (int i = 0; sleep && i < 10; i++) { std::this_thread::sleep_for(25ms); } };
lmq.job(sleeper);
lmq.job(sleeper);
// This one should stall:
std::atomic<bool> bad = false;
lmq.job([&] { bad = true; });
std::this_thread::sleep_for(50ms);
done = false;
lmq.job([&] { id = std::this_thread::get_id(); done = true; }, &t_abc);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
REQUIRE( done.load() );
REQUIRE_FALSE( bad.load() );
}
done = false;
// We can queue up a bunch of jobs which should all happen in order, and all on the abc thread.
std::vector<int> v;
for (int i = 0; i < 100; i++) {
lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, &t_abc);
}
lmq.job([&] { done = true; }, &t_abc);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
REQUIRE( done.load() );
REQUIRE_FALSE( bad.load() );
REQUIRE( v.size() == 100 );
for (int i = 0; i < 100; i++)
REQUIRE( v[i] == i );
}
sleep = false;
wait_for([&] { return bad.load(); });
{
auto lock = catch_lock();
REQUIRE( bad.load() );
}
}
TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]") {
lokimq::LokiMQ lmq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
std::thread::id id_abc;
auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
lmq.start();
lokimq::Batch<int> batch;
for (int i = 1; i < 10; i++)
batch.add_job([i, &id_abc]() { if (std::this_thread::get_id() == id_abc) return 0; return i; });
std::atomic<int> result_sum = -1;
batch.completion([&](auto result) {
int sum = 0;
for (auto& r : result)
sum += r.get();
result_sum = std::this_thread::get_id() == id_abc ? sum : -sum;
}, &t_abc);
lmq.batch(std::move(batch));
wait_for([&] { return result_sum.load() != -1; });
{
auto lock = catch_lock();
REQUIRE( result_sum == 45 );
}
}
TEST_CASE("timer job completion on tagged threads", "[tagged][timer]") {
lokimq::LokiMQ lmq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
std::thread::id id_abc;
auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
lmq.start();
std::atomic<int> ticks = 0;
std::atomic<int> abc_ticks = 0;
lmq.add_timer([&] { ticks++; }, 10ms);
lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, &t_abc);
wait_for([&] { return ticks.load() > 2 && abc_ticks > 2; });
{
auto lock = catch_lock();
REQUIRE( ticks.load() > 2 );
REQUIRE( abc_ticks.load() > 2 );
}
}