iot seems that logic thread didn't work the way i remember it should

make logic work the way it should
This commit is contained in:
Jeff Becker 2019-11-14 10:06:53 -05:00
parent 3c8e148372
commit f16c9f9b5d
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
8 changed files with 193 additions and 197 deletions

View File

@ -44,7 +44,7 @@ llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev,
if(ev->running())
{
ev->update_time();
logic->tick_async(ev->time_now());
logic->tick(ev->time_now());
}
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}

View File

@ -1,88 +1,74 @@
#include <util/thread/logic.hpp>
#include <util/thread/timer.hpp>
#include <util/logging/logger.hpp>
#include <util/mem.h>
#include <future>
namespace llarp
{
void
Logic::tick(llarp_time_t now)
{
llarp_timer_set_time(this->timer, now);
llarp_timer_tick_all(this->timer);
llarp_threadpool_tick(this->thread);
llarp_timer_tick_all_async(m_Timer, m_Thread, now);
}
Logic::Logic()
: thread(llarp_init_threadpool(1, "llarp-logic"))
, timer(llarp_init_timer())
: m_Thread(llarp_init_threadpool(1, "llarp-logic"))
, m_Timer(llarp_init_timer())
{
llarp_threadpool_start(thread);
llarp_threadpool_start(m_Thread);
/// set thread id
thread->impl->addJob([&]() { id.emplace(std::this_thread::get_id()); });
std::promise< ID_t > result;
// queue setting id and try to get the result back
llarp_threadpool_queue_job(m_Thread, [&]() {
m_ID.emplace(std::this_thread::get_id());
result.set_value(m_ID.value());
});
// get the result back
ID_t spawned = result.get_future().get();
LogDebug("logic thread spawned on ", spawned);
}
Logic::~Logic()
{
llarp_threadpool_stop(this->thread);
llarp_threadpool_join(this->thread);
llarp_free_threadpool(&this->thread);
delete m_Thread;
llarp_free_timer(m_Timer);
}
void
Logic::tick_async(llarp_time_t now)
{
llarp_timer_tick_all_async(this->timer, this->thread, now);
}
void
Logic::stop_timer()
{
llarp_timer_stop(this->timer);
}
void
bool
Logic::queue_job(struct llarp_thread_job job)
{
if(job.user && job.work)
queue_func(std::bind(job.work, job.user));
return job.user && job.work && queue_func(std::bind(job.work, job.user));
}
void
Logic::stop()
{
llarp::LogDebug("logic thread stop");
if(this->thread)
{
llarp_threadpool_stop(this->thread);
}
llarp::LogDebug("logic timer stop");
if(this->timer)
llarp_timer_stop(this->timer);
}
void
Logic::mainloop()
{
llarp_timer_run(this->timer, this->thread);
// stop all timers from happening in the future
queue_func(std::bind(&llarp_timer_stop, m_Timer));
// stop all operations on threadpool
llarp_threadpool_stop(m_Thread);
}
bool
Logic::queue_func(std::function< void(void) >&& f)
Logic::queue_func(std::function< void(void) > f)
{
if(can_flush() && thread->impl->capacity() == thread->impl->capacity())
if(can_flush() and m_Thread->LooksFull())
{
call_later(5, f);
// we are calling in the logic thread and our queue looks full
// defer call to a later time so we don't die like a little bitch
call_later(m_Thread->GuessJobLatency() / 2, f);
return true;
}
return thread->impl->addJob(f);
return llarp_threadpool_queue_job(m_Thread, f);
}
void
Logic::call_later(llarp_time_t timeout, std::function< void(void) > func)
{
llarp_timer_call_func_later(this->timer, timeout, func);
llarp_timer_call_func_later(m_Timer, timeout, func);
}
uint32_t
@ -92,25 +78,25 @@ namespace llarp
j.user = job.user;
j.timeout = job.timeout;
j.handler = job.handler;
return llarp_timer_call_later(this->timer, j);
return llarp_timer_call_later(m_Timer, j);
}
void
Logic::cancel_call(uint32_t id)
{
llarp_timer_cancel_job(this->timer, id);
llarp_timer_cancel_job(m_Timer, id);
}
void
Logic::remove_call(uint32_t id)
{
llarp_timer_remove_job(this->timer, id);
llarp_timer_remove_job(m_Timer, id);
}
bool
Logic::can_flush() const
{
return id.value() == std::this_thread::get_id();
return m_ID.value() == std::this_thread::get_id();
}
} // namespace llarp

View File

@ -11,36 +11,23 @@ namespace llarp
class Logic
{
public:
struct llarp_threadpool* thread;
struct llarp_timer_context* timer;
absl::optional< std::thread::id > id;
Logic();
~Logic();
/// single threaded tick
/// trigger times as needed
void
tick(llarp_time_t now);
/// isolated tick
void
tick_async(llarp_time_t now);
void
stop_timer();
/// stop all operation and wait for that to die
void
stop();
void
mainloop();
void
bool
queue_job(struct llarp_thread_job job);
bool
queue_func(std::function< void(void) >&& func);
queue_func(std::function< void(void) > func);
uint32_t
call_later(const llarp_timeout_job& job);
@ -56,6 +43,12 @@ namespace llarp
bool
can_flush() const;
private:
using ID_t = std::thread::id;
llarp_threadpool* const m_Thread;
llarp_timer_context* const m_Timer;
absl::optional< ID_t > m_ID;
};
} // namespace llarp

View File

@ -15,18 +15,13 @@ llarp_init_threadpool(int workers, const char *name)
return new llarp_threadpool(workers, name);
}
struct llarp_threadpool *
llarp_init_same_process_threadpool()
{
return new llarp_threadpool();
}
void
llarp_threadpool_join(struct llarp_threadpool *pool)
{
llarp::LogDebug("threadpool join");
if(pool->impl)
pool->impl->drain();
pool->impl->stop();
pool->impl.reset();
}
void
@ -41,64 +36,29 @@ llarp_threadpool_stop(struct llarp_threadpool *pool)
{
llarp::LogDebug("threadpool stop");
if(pool->impl)
pool->impl->stop();
if(pool->jobs)
pool->jobs->disable();
pool->impl->disable();
}
void
llarp_threadpool_wait(struct llarp_threadpool *pool)
{
llarp::LogDebug("threadpool wait");
if(pool->impl)
{
pool->impl->drain();
}
}
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
struct llarp_thread_job job)
{
return llarp_threadpool_queue_job(pool, (std::bind(job.work, job.user)));
return llarp_threadpool_queue_job(pool, std::bind(job.work, job.user));
}
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
std::function< void() > func)
std::function< void(void) > func)
{
if(pool->impl)
{
while(!pool->impl->tryAddJob(func))
{
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
else
{
// single threaded mode
while(pool->jobs->tryPushBack(func) != llarp::thread::QueueReturn::Success)
{
if(!pool->jobs->enabled())
return;
if(::getpid() == pool->callingPID)
llarp_threadpool_tick(pool);
else
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
return pool->impl && pool->impl->addJob(func);
}
void
llarp_threadpool_tick(struct llarp_threadpool *pool)
{
while(pool->size())
if(pool->impl)
{
auto job = pool->jobs->tryPopFront();
if(job)
{
(*job)();
}
pool->impl->drain();
}
}
@ -111,3 +71,47 @@ llarp_free_threadpool(struct llarp_threadpool **pool)
}
*pool = nullptr;
}
size_t
llarp_threadpool::size() const
{
return impl ? impl->capacity() : 0;
}
size_t
llarp_threadpool::pendingJobs() const
{
return impl ? impl->jobCount() : 0;
}
size_t
llarp_threadpool::numThreads() const
{
return impl ? impl->activeThreadCount() : 0;
}
llarp_time_t
llarp_threadpool::GuessJobLatency(llarp_time_t granularity) const
{
static const llarp_time_t minimum = llarp_time_t{10};
granularity = std::max(granularity, minimum);
const llarp_time_t _jobs = llarp_time_t{pendingJobs()} * granularity;
const llarp_time_t _capacity =
std::max(llarp_time_t{size()} * granularity, granularity);
const llarp_time_t _numthreads =
std::max(llarp_time_t{numThreads()} * granularity, granularity);
// divisor = log10(granularity)
llarp_time_t divisor = 0;
do
{
granularity /= 10;
if(granularity > 0)
divisor++;
} while(granularity > 0);
// granulairuty is minimum of 10 so log10(granulairuty) is never 0
divisor *= divisor;
// job lag is pending number of jobs divided by job queue length per thread
// divided by log10(granularity) sqaured
const llarp_time_t _queue_length = _capacity / _numthreads;
return _jobs / _queue_length / divisor;
}

View File

@ -5,48 +5,50 @@
#include <util/thread/queue.hpp>
#include <util/thread/thread_pool.hpp>
#include <util/thread/threading.hpp>
#include <util/types.hpp>
#include <absl/base/thread_annotations.h>
#include <memory>
#include <queue>
struct llarp_threadpool;
#ifdef __cplusplus
struct llarp_threadpool
{
std::unique_ptr< llarp::thread::ThreadPool > impl;
std::unique_ptr< llarp::thread::Queue< std::function< void(void) > > > jobs;
const pid_t callingPID;
llarp_threadpool(int workers, llarp::string_view name)
: impl(std::make_unique< llarp::thread::ThreadPool >(workers,
workers * 128, name))
, jobs(nullptr)
, callingPID(0)
llarp_threadpool(int workers, llarp::string_view name,
size_t queueLength = size_t{1024})
: impl(std::make_unique< llarp::thread::ThreadPool >(
workers, std::max(queueLength, size_t{32}), name))
{
}
llarp_threadpool()
: jobs(new llarp::thread::Queue< std::function< void() > >(128))
, callingPID(llarp::util::GetPid())
{
jobs->enable();
}
size_t
size() const
size() const;
size_t
pendingJobs() const;
size_t
numThreads() const;
/// try to guess how big our job latency is on this threadpool
llarp_time_t
GuessJobLatency(llarp_time_t granulairty = 1000) const;
bool
LooksFull() const
{
if(jobs)
return jobs->size();
return 0;
return pendingJobs() >= size();
}
};
#endif
struct llarp_threadpool *
llarp_init_threadpool(int workers, const char *name);
/// for single process mode
struct llarp_threadpool *
llarp_init_same_process_threadpool();
void
llarp_free_threadpool(struct llarp_threadpool **tp);
@ -55,44 +57,41 @@ using llarp_thread_work_func = void (*)(void *);
/** job to be done in worker thread */
struct llarp_thread_job
{
#ifdef __cplusplus
/** user data to pass to work function */
void *user{nullptr};
/** called in threadpool worker thread */
llarp_thread_work_func work{nullptr};
#ifdef __cplusplus
llarp_thread_job(void *u, llarp_thread_work_func w) : user(u), work(w)
{
}
llarp_thread_job() = default;
#else
void *user;
llarp_thread_work_func work;
#endif
};
/// for single process mode
void
llarp_threadpool_tick(struct llarp_threadpool *tp);
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
struct llarp_thread_job j);
#ifdef __cplusplus
void
bool
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
std::function< void() > func);
std::function< void(void) > func);
#endif
void
llarp_threadpool_start(struct llarp_threadpool *tp);
void
llarp_threadpool_stop(struct llarp_threadpool *tp);
void
llarp_threadpool_join(struct llarp_threadpool *tp);
void
llarp_threadpool_wait(struct llarp_threadpool *tp);
#endif

View File

@ -1,5 +1,5 @@
#include <util/thread/timer.hpp>
#include <util/logging/logger.hpp>
#include <util/time.hpp>
#include <atomic>
@ -65,6 +65,9 @@ struct llarp_timer_context
absl::Duration nextTickLen = absl::Milliseconds(100);
llarp_time_t m_Now;
llarp_time_t m_NextRequiredTickAt =
std::numeric_limits< llarp_time_t >::max();
size_t m_NumPendingTimers;
llarp_timer_context()
{
@ -118,46 +121,52 @@ struct llarp_timer_context
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout_ms, user, func));
m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms);
m_NumPendingTimers = timers.size();
return id;
}
uint32_t
call_func_later(std::function< void(void) > func, llarp_time_t timeout)
call_func_later(std::function< void(void) > func, llarp_time_t timeout_ms)
{
llarp::util::Lock lock(&timersMutex);
const uint32_t id = ++currentId;
timers.emplace(
id, std::make_unique< llarp::timer >(m_Now, timeout, nullptr, nullptr));
id,
std::make_unique< llarp::timer >(m_Now, timeout_ms, nullptr, nullptr));
timers[id]->deferredFunc = func;
m_NextRequiredTickAt = std::min(m_NextRequiredTickAt, m_Now + timeout_ms);
m_NumPendingTimers = timers.size();
return id;
}
void
cancel_all() LOCKS_EXCLUDED(timersMutex)
{
std::list< uint32_t > ids;
{
llarp::util::Lock lock(&timersMutex);
for(auto& item : timers)
{
ids.push_back(item.first);
item.second->func = nullptr;
item.second->canceled = true;
}
}
}
for(auto id : ids)
{
cancel(id);
}
bool
ShouldTriggerTimers(llarp_time_t peekAhead) const
{
return m_NumPendingTimers > 0
and (m_Now + peekAhead) >= m_NextRequiredTickAt;
}
};
struct llarp_timer_context*
llarp_init_timer()
{
return new llarp_timer_context;
return new llarp_timer_context();
}
uint32_t
@ -175,11 +184,9 @@ llarp_timer_call_func_later(struct llarp_timer_context* t, llarp_time_t timeout,
}
void
llarp_free_timer(struct llarp_timer_context** t)
llarp_free_timer(struct llarp_timer_context* t)
{
if(*t)
delete *t;
*t = nullptr;
delete t;
}
void
@ -191,6 +198,7 @@ llarp_timer_remove_job(struct llarp_timer_context* t, uint32_t id)
void
llarp_timer_stop(struct llarp_timer_context* t)
{
llarp::LogDebug("timers stopping");
// destroy all timers
// don't call callbacks on timers
{
@ -200,6 +208,7 @@ llarp_timer_stop(struct llarp_timer_context* t)
}
if(t->ticker)
t->ticker->SignalAll();
llarp::LogDebug("timers stopped");
}
void
@ -236,31 +245,39 @@ llarp_timer_tick_all(struct llarp_timer_context* t)
itr = t->timers.erase(itr);
}
else
{
++itr;
}
}
}
for(const auto& h : hit)
while(not hit.empty())
{
if(h->func)
{
h->called_at = t->m_Now;
h->exec();
}
const auto& h = hit.front();
h->called_at = t->m_Now;
h->exec();
hit.pop_front();
}
// reindex next tick info
{
llarp::util::Lock lock(&t->timersMutex);
t->m_Now = llarp::time_now_ms();
t->m_NextRequiredTickAt = std::numeric_limits< llarp_time_t >::max();
for(const auto& item : t->timers)
{
t->m_NextRequiredTickAt =
std::min(t->m_NextRequiredTickAt, item.second->timeout + t->m_Now);
}
t->m_NumPendingTimers = t->timers.size();
}
}
static void
llarp_timer_tick_all_job(void* user)
{
llarp_timer_tick_all(static_cast< llarp_timer_context* >(user));
}
void
llarp_timer_tick_all_async(struct llarp_timer_context* t,
struct llarp_threadpool* pool, llarp_time_t now)
{
t->m_Now = now;
llarp_threadpool_queue_job(pool, {t, llarp_timer_tick_all_job});
llarp_timer_set_time(t, now);
if(t->ShouldTriggerTimers(pool->GuessJobLatency()))
llarp_threadpool_queue_job(pool, std::bind(&llarp_timer_tick_all, t));
}
void
@ -302,8 +319,9 @@ namespace llarp
else
call(user, timeout, diff);
}
if(deferredFunc)
if(deferredFunc && not canceled)
deferredFunc();
done = true;
deferredFunc = nullptr;
done = true;
}
} // namespace llarp

View File

@ -60,6 +60,6 @@ llarp_timer_tick_all_async(struct llarp_timer_context *t,
struct llarp_threadpool *pool, llarp_time_t now);
void
llarp_free_timer(struct llarp_timer_context **t);
llarp_free_timer(struct llarp_timer_context *t);
#endif

View File

@ -120,11 +120,13 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
Context Bob;
bool success = false;
const bool shouldDebug = false;
llarp_ev_loop_ptr netLoop;
std::shared_ptr< Logic > m_logic;
llarp_time_t oldRCLifetime;
llarp::LogLevel oldLevel;
LinkLayerTest() : netLoop(nullptr)
{
@ -133,6 +135,9 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
void
SetUp()
{
oldLevel = llarp::LogContext::Instance().minLevel;
if(shouldDebug)
llarp::SetLogLevel(eLogDebug);
oldRCLifetime = RouterContact::Lifetime;
RouterContact::BlockBogons = false;
RouterContact::Lifetime = 500;
@ -151,32 +156,23 @@ struct LinkLayerTest : public test::LlarpTest< llarp::sodium::CryptoLibSodium >
netLoop.reset();
RouterContact::BlockBogons = true;
RouterContact::Lifetime = oldRCLifetime;
}
static void
OnTimeout(void* u, uint64_t, uint64_t left)
{
if(left)
return;
llarp::LogInfo("timed out test");
static_cast< LinkLayerTest* >(u)->Stop();
llarp::SetLogLevel(oldLevel);
}
void
RunMainloop()
{
m_logic->call_later({5000, this, &OnTimeout});
m_logic->call_later(5000, std::bind(&LinkLayerTest::Stop, this));
llarp_ev_loop_run_single_process(netLoop, m_logic);
}
void
Stop()
{
m_logic->queue_func([&]() {
Alice.Stop();
Bob.Stop();
llarp_ev_loop_stop(netLoop);
});
Alice.Stop();
Bob.Stop();
llarp_ev_loop_stop(netLoop);
m_logic->stop();
}
bool