lokinet/llarp/threadpool.cpp

93 lines
1.9 KiB
C++
Raw Normal View History

2018-01-29 15:19:00 +01:00
#include "threadpool.hpp"
#include <iostream>
2018-01-29 15:27:24 +01:00
namespace llarp {
namespace thread {
Pool::Pool(size_t workers) {
2018-01-31 20:59:26 +01:00
stop.store(false);
2018-01-29 15:27:24 +01:00
while (workers--) {
threads.emplace_back([this] {
for (;;) {
llarp_thread_job job;
{
lock_t lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->jobs.empty(); });
if (this->stop && this->jobs.empty())
return;
job = std::move(this->jobs.front());
this->jobs.pop();
}
// do work
job.work(job.user);
// inform result if needed
2018-01-31 20:59:26 +01:00
if (job.caller)
{
if (!llarp_ev_call_async(job.caller, job.data)) {
2018-01-29 15:27:24 +01:00
std::cerr << "failed to queue result in thread worker" << std::endl;
}
2018-01-31 20:59:26 +01:00
}
2018-01-29 15:19:00 +01:00
}
2018-01-29 15:27:24 +01:00
});
}
}
2018-01-29 15:19:00 +01:00
2018-01-29 15:27:24 +01:00
void Pool::Join() {
{
lock_t lock(queue_mutex);
stop.store(true);
}
condition.notify_all();
for (auto &t : threads)
t.join();
}
2018-01-29 15:19:00 +01:00
2018-01-29 15:27:24 +01:00
void Pool::QueueJob(llarp_thread_job job) {
{
lock_t lock(queue_mutex);
2018-01-29 15:19:00 +01:00
2018-01-29 15:27:24 +01:00
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
jobs.emplace(job);
2018-01-29 15:19:00 +01:00
}
2018-01-29 15:27:24 +01:00
condition.notify_one();
2018-01-29 15:19:00 +01:00
}
2018-01-29 15:27:24 +01:00
} // namespace thread
} // namespace llarp
struct llarp_threadpool {
2018-01-29 15:19:00 +01:00
llarp::thread::Pool impl;
llarp_threadpool(int workers) : impl(workers) {}
};
extern "C" {
2018-01-29 15:27:24 +01:00
struct llarp_threadpool *llarp_init_threadpool(int workers) {
if (workers > 0)
return new llarp_threadpool(workers);
else
return nullptr;
}
2018-01-29 15:19:00 +01:00
2018-01-29 15:27:24 +01:00
void llarp_threadpool_join(struct llarp_threadpool *pool) { pool->impl.Join(); }
2018-01-29 15:19:00 +01:00
2018-01-31 20:59:26 +01:00
void llarp_threadpool_start(struct llarp_threadpool * pool)
{
/** no op */
}
void llarp_threadpool_queue_job(struct llarp_threadpool * pool, struct llarp_thread_job job)
{
pool->impl.QueueJob(job);
}
2018-01-29 15:27:24 +01:00
void llarp_free_threadpool(struct llarp_threadpool **pool) {
delete *pool;
*pool = nullptr;
}
2018-01-29 15:19:00 +01:00
}