mirror of https://github.com/oxen-io/lokinet
make timer work
This commit is contained in:
parent
ce8e5088c4
commit
e695906c0c
|
@ -6,6 +6,7 @@ struct llarp_main {
|
|||
struct llarp_alloc mem;
|
||||
struct llarp_router *router;
|
||||
struct llarp_threadpool *worker;
|
||||
struct llarp_threadpool *netio;
|
||||
struct llarp_logic *logic;
|
||||
struct llarp_config *config;
|
||||
struct llarp_nodedb *nodedb;
|
||||
|
@ -47,6 +48,13 @@ int shutdown_llarp(struct llarp_main *m) {
|
|||
progress();
|
||||
if(m->mainloop)
|
||||
llarp_ev_loop_stop(m->mainloop);
|
||||
|
||||
progress();
|
||||
if(m->netio)
|
||||
{
|
||||
llarp_threadpool_stop(m->netio);
|
||||
llarp_threadpool_join(m->netio);
|
||||
}
|
||||
|
||||
progress();
|
||||
if(m->worker)
|
||||
|
@ -56,7 +64,7 @@ int shutdown_llarp(struct llarp_main *m) {
|
|||
|
||||
if(m->worker)
|
||||
llarp_threadpool_join(m->worker);
|
||||
|
||||
|
||||
progress();
|
||||
if (m->logic) llarp_logic_stop(m->logic);
|
||||
|
||||
|
@ -90,10 +98,17 @@ struct llarp_main llarp = {
|
|||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
{0},
|
||||
1
|
||||
};
|
||||
|
||||
void run_netio(void * user)
|
||||
{
|
||||
struct llarp_ev_loop * loop = user;
|
||||
llarp_ev_loop_run(loop);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
const char *conffname = "daemon.ini";
|
||||
if (argc > 1) conffname = argv[1];
|
||||
|
@ -116,6 +131,8 @@ int main(int argc, char *argv[]) {
|
|||
if (llarp_nodedb_ensure_dir(dir)) {
|
||||
// ensure worker thread pool
|
||||
if (!llarp.worker) llarp.worker = llarp_init_threadpool(2);
|
||||
// ensire net io thread
|
||||
llarp.netio = llarp_init_threadpool(1);
|
||||
|
||||
llarp.router = llarp_init_router(mem, llarp.worker, llarp.mainloop);
|
||||
|
||||
|
@ -123,12 +140,16 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
llarp.logic = llarp_init_logic(mem);
|
||||
printf("starting router\n");
|
||||
|
||||
llarp_run_router(llarp.router, llarp.logic);
|
||||
|
||||
printf("running mainloop\n");
|
||||
// run io loop
|
||||
struct llarp_thread_job netjob = {
|
||||
.user = llarp.mainloop,
|
||||
.work = &run_netio
|
||||
};
|
||||
llarp_threadpool_queue_job(llarp.netio, netjob);
|
||||
printf("running\n");
|
||||
llarp.exitcode = 0;
|
||||
llarp_ev_loop_run(llarp.mainloop);
|
||||
llarp_logic_mainloop(llarp.logic);
|
||||
} else
|
||||
printf("Failed to configure router\n");
|
||||
} else
|
||||
|
|
|
@ -171,6 +171,7 @@ struct server
|
|||
void cleanup_dead()
|
||||
{
|
||||
// todo: implement
|
||||
printf("cleanup dead\n");
|
||||
}
|
||||
|
||||
bool ensure_privkey()
|
||||
|
|
|
@ -12,34 +12,41 @@ struct timer {
|
|||
.count();
|
||||
}
|
||||
|
||||
llarp_timer_context * parent;
|
||||
void* user;
|
||||
uint64_t started;
|
||||
uint64_t timeout;
|
||||
llarp_timer_handler_func func;
|
||||
uint32_t id;
|
||||
|
||||
timer(uint64_t ms = 0, void* _user = nullptr,
|
||||
llarp_timer_handler_func _func = nullptr)
|
||||
: user(_user), started(now()), timeout(ms), func(_func) {}
|
||||
timer(llarp_timer_context * ctx=nullptr, uint64_t ms = 0, void* _user = nullptr,
|
||||
llarp_timer_handler_func _func = nullptr, uint32_t _id=0)
|
||||
: parent(ctx), user(_user), started(now()), timeout(ms), func(_func), id(_id) {}
|
||||
|
||||
void operator()() {
|
||||
if (func) {
|
||||
auto ms = now();
|
||||
auto diff = ms - started;
|
||||
if (diff >= timeout)
|
||||
func(user, timeout, 0);
|
||||
else
|
||||
func(user, timeout, diff);
|
||||
}
|
||||
|
||||
void exec();
|
||||
|
||||
|
||||
static void call(void * user)
|
||||
{
|
||||
static_cast<timer *>(user)->exec();
|
||||
}
|
||||
|
||||
operator llarp_thread_job ()
|
||||
{
|
||||
return {this, timer::call};
|
||||
}
|
||||
|
||||
};
|
||||
}; // namespace llarp
|
||||
|
||||
struct llarp_timer_context {
|
||||
llarp_threadpool * threadpool;
|
||||
std::mutex timersMutex;
|
||||
std::map<uint32_t, llarp::timer> timers;
|
||||
std::mutex tickerMutex;
|
||||
std::condition_variable ticker;
|
||||
std::chrono::milliseconds nextTickLen = std::chrono::seconds(1);
|
||||
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(10);
|
||||
|
||||
uint32_t ids = 0;
|
||||
std::atomic<bool> _run = true;
|
||||
|
@ -54,18 +61,23 @@ struct llarp_timer_context {
|
|||
|
||||
auto itr = timers.find(id);
|
||||
if (itr != timers.end()) {
|
||||
itr->second();
|
||||
timers.erase(id);
|
||||
itr->second.exec();
|
||||
}
|
||||
|
||||
if (lock) delete lock;
|
||||
}
|
||||
|
||||
void remove(uint32_t id)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (timersMutex);
|
||||
timers.erase(id);
|
||||
}
|
||||
|
||||
uint32_t call_later(void* user, llarp_timer_handler_func func,
|
||||
uint64_t timeout_ms) {
|
||||
std::unique_lock<std::mutex> lock(timersMutex);
|
||||
uint32_t id = ++ids;
|
||||
timers[id] = llarp::timer(timeout_ms);
|
||||
timers[id] = llarp::timer(this, timeout_ms, user, func, id);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
@ -111,12 +123,39 @@ void llarp_timer_cancel(struct llarp_timer_context* t, uint32_t id) {
|
|||
|
||||
void llarp_timer_run(struct llarp_timer_context* t,
|
||||
struct llarp_threadpool* pool) {
|
||||
std::unique_lock<std::mutex> lock(t->tickerMutex);
|
||||
t->threadpool = pool;
|
||||
while (t->run()) {
|
||||
auto status = t->ticker.wait_for(lock, t->nextTickLen);
|
||||
if (status == std::cv_status::no_timeout) {
|
||||
// we woke up
|
||||
std::unique_lock<std::mutex> lock(t->tickerMutex);
|
||||
t->ticker.wait_for(lock, t->nextTickLen);
|
||||
// we woke up
|
||||
auto now = llarp::timer::now();
|
||||
auto itr = t->timers.begin();
|
||||
while (itr != t->timers.end())
|
||||
{
|
||||
if(now - itr->second.started >= itr->second.timeout)
|
||||
{
|
||||
// timer hit
|
||||
llarp_threadpool_queue_job(pool, itr->second);
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
void timer::exec()
|
||||
{
|
||||
if (func) {
|
||||
auto ms = now();
|
||||
auto diff = ms - started;
|
||||
if (diff >= timeout)
|
||||
func(user, timeout, 0);
|
||||
else
|
||||
func(user, timeout, diff);
|
||||
}
|
||||
if(parent)
|
||||
parent->remove(id);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue