This commit is contained in:
Jeff Becker 2018-04-30 14:18:18 -04:00
parent a5611e2206
commit 8aa898aef8
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
11 changed files with 289 additions and 12 deletions

View File

@ -5,7 +5,7 @@
struct llarp_main {
struct llarp_router *router;
struct llarp_threadpool *worker;
struct llarp_threadpool *logic;
struct llarp_logic *logic;
struct llarp_config *config;
struct llarp_ev_loop *mainloop;
};
@ -40,10 +40,10 @@ int shutdown_llarp(struct llarp_main *m) {
progress();
llarp_threadpool_join(m->worker);
progress();
if (m->logic) llarp_threadpool_wait(m->logic);
progress();
if (m->logic) llarp_threadpool_join(m->logic);
if (m->logic) llarp_logic_stop(m->logic);
progress();
llarp_free_router(&m->router);
progress();
llarp_free_config(&m->config);
@ -52,7 +52,7 @@ int shutdown_llarp(struct llarp_main *m) {
progress();
llarp_free_threadpool(&m->worker);
progress();
if (m->logic) llarp_free_threadpool(&m->logic);
llarp_free_logic(&m->logic);
progress();
printf("\n");
fflush(stdout);
@ -80,7 +80,7 @@ int main(int argc, char *argv[]) {
llarp.router = llarp_init_router(llarp.worker);
if (llarp_configure_router(llarp.router, llarp.config)) {
llarp.logic = llarp_init_threadpool(1);
llarp.logic = llarp_init_logic();
printf("starting router\n");
llarp_run_router(llarp.router, llarp.logic);
printf("running mainloop\n");

View File

@ -4,4 +4,5 @@
#include <llarp/version.h>
#include <llarp/mem.h>
#include <llarp/ev.h>
#include <llarp/logic.h>
#endif

View File

@ -5,6 +5,7 @@
#include <llarp/mem.h>
#include <llarp/msg_handler.h>
#include <llarp/obmd.h>
#include <llarp/logic.h>
#include <stdbool.h>
#include <stdint.h>
@ -65,7 +66,7 @@ struct llarp_link {
int (*register_listener)(struct llarp_link *, struct llarp_link_ev_listener);
void (*deregister_listener)(struct llarp_link *, int);
bool (*configure)(struct llarp_link *, const char *, int, uint16_t);
bool (*start_link)(struct llarp_link *, struct llarp_threadpool *);
bool (*start_link)(struct llarp_link *, struct llarp_logic *);
bool (*stop_link)(struct llarp_link *);
bool (*put_ai)(struct llarp_link *, struct llarp_ai *);
void (*iter_sessions)(struct llarp_link *, struct llarp_link_session_iter);

28
include/llarp/logic.h Normal file
View File

@ -0,0 +1,28 @@
#ifndef LLARP_LOGIC_H
#define LLARP_LOGIC_H
#include <llarp/threadpool.h>
#include <llarp/timer.h>
#ifdef __cplusplus
extern "C" {
#endif
struct llarp_logic;
struct llarp_logic * llarp_init_logic();
void llarp_free_logic(struct llarp_logic ** logic);
void llarp_logic_queue_job(struct llarp_logic * logic, struct llarp_thread_job job);
uint32_t llarp_logic_call_later(struct llarp_logic * logic, struct llarp_timeout_job job);
void llarp_logic_cancel_call(struct llarp_logic * logic, uint32_t id);
void llarp_logic_stop(struct llarp_logic * logic);
void llarp_logic_mainloop(struct llarp_logic * logic);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -5,6 +5,7 @@
#include <llarp/ibmq.h>
#include <llarp/obmd.h>
#include <llarp/threadpool.h>
#include <llarp/logic.h>
#ifdef __cplusplus
extern "C" {
@ -19,7 +20,7 @@ bool llarp_configure_router(struct llarp_router *router,
struct llarp_config *conf);
void llarp_run_router(struct llarp_router *router,
struct llarp_threadpool *logic);
struct llarp_logic *logic);
void llarp_stop_router(struct llarp_router *router);
/** get router's inbound link level frame queue */

View File

@ -1,6 +1,5 @@
#ifndef LLARP_THREADPOOL_H
#define LLARP_THREADPOOL_H
#include <llarp/ev.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -29,6 +28,7 @@ void llarp_threadpool_join(struct llarp_threadpool *tp);
void llarp_threadpool_wait(struct llarp_threadpool *tp);
#ifdef __cplusplus
}
#endif

37
include/llarp/timer.h Normal file
View File

@ -0,0 +1,37 @@
#ifndef LLARP_TIMER_H
#define LLARP_TIMER_H
#include <llarp/common.h>
#include <llarp/threadpool.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef void(*llarp_timer_handler_func)(void *, uint64_t, uint64_t);
struct llarp_timeout_job
{
uint64_t timeout;
void * user;
llarp_timer_handler_func handler;
};
struct llarp_timer_context;
struct llarp_timer_context * llarp_init_timer();
uint32_t llarp_timer_call_later(struct llarp_timer_context * t, struct llarp_timeout_job job);
void llarp_timer_cancel(struct llarp_timer_context * t, uint32_t id);
// cancel all
void llarp_timer_stop(struct llarp_timer_context * t);
// blocking run timer and send events to thread pool
void llarp_timer_run(struct llarp_timer_context * t, struct llarp_threadpool * pool);
void llarp_free_timer(struct llarp_timer_context ** t);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -26,7 +26,7 @@ struct link_impl {
const char *name() { return linkname.c_str(); }
bool start(llarp_threadpool *logic) {
bool start(llarp_logic *logic) {
// todo: implement
return false;
}
@ -43,7 +43,7 @@ static bool configure(struct llarp_link *l, const char *ifname, int af,
return link->configure(ifname, af, port);
}
static bool start_link(struct llarp_link *l, struct llarp_threadpool *logic) {
static bool start_link(struct llarp_link *l, struct llarp_logic *logic) {
link_impl *link = static_cast<link_impl *>(l->impl);
return link->start(logic);
}

57
llarp/logic.c Normal file
View File

@ -0,0 +1,57 @@
#include <llarp/logic.h>
#include <llarp/mem.h>
struct llarp_logic
{
struct llarp_threadpool * thread;
struct llarp_timer_context * timer;
};
struct llarp_logic * llarp_init_logic()
{
struct llarp_logic * logic = llarp_g_mem.alloc(sizeof(struct llarp_logic), 8);
if(logic)
{
logic->thread = llarp_init_threadpool(1);
logic->timer = llarp_init_timer();
}
return logic;
};
void llarp_free_logic(struct llarp_logic ** logic)
{
if(*logic)
{
llarp_free_threadpool(&(*logic)->thread);
llarp_free_timer(&(*logic)->timer);
llarp_g_mem.free(*logic);
*logic = NULL;
}
}
static void llarp_logic_stop_work(void * user)
{
struct llarp_logic * logic = user;
llarp_timer_stop(logic->timer);
}
void llarp_logic_stop(struct llarp_logic * logic)
{
struct llarp_thread_job job = {
.user = logic,
.work = &llarp_logic_stop_work
};
llarp_threadpool_queue_job(logic->thread, job);
llarp_threadpool_stop(logic->thread);
llarp_threadpool_join(logic->thread);
}
void llarp_logic_mainloop(struct llarp_logic * logic)
{
llarp_threadpool_start(logic->thread);
llarp_timer_run(logic->timer, logic->thread);
llarp_threadpool_wait(logic->thread);
}

View File

@ -59,7 +59,7 @@ bool llarp_configure_router(struct llarp_router *router,
}
void llarp_run_router(struct llarp_router *router,
struct llarp_threadpool *logic) {
struct llarp_logic *logic) {
router->ForEachLink([logic](llarp_link *link) {
int result = link->start_link(link, logic);
if (result == -1) printf("link %s failed to start\n", link->name(link));

152
llarp/timer.cpp Normal file
View File

@ -0,0 +1,152 @@
#include <llarp/timer.h>
#include <map>
#include <condition_variable>
#include <atomic>
#include <list>
namespace llarp
{
struct timer
{
static uint64_t now()
{
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
}
void * user;
uint64_t started;
uint64_t timeout;
llarp_timer_handler_func func;
timer(uint64_t ms=0, void * _user=nullptr, llarp_timer_handler_func _func=nullptr) :
user(_user),
started(now()),
timeout(ms),
func(_func)
{
}
void operator () () {
if (func)
{
auto ms = now();
auto diff = ms - started;
if (diff >= timeout)
func(user, timeout, 0);
else
func(user, timeout, diff);
}
}
};
};
struct llarp_timer_context
{
std::mutex timersMutex;
std::map<uint32_t, llarp::timer> timers;
std::mutex tickerMutex;
std::condition_variable ticker;
std::chrono::milliseconds nextTickLen = std::chrono::seconds(1);
uint32_t ids = 0;
std::atomic<bool> _run = true;
bool run()
{
return _run.load();
}
void stop()
{
_run.store(false);
}
void cancel(uint32_t id, bool lockit=true)
{
std::unique_lock<std::mutex> * lock = nullptr;
if(lockit)
lock = new std::unique_lock<std::mutex>(timersMutex);
auto itr = timers.find(id);
if(itr != timers.end())
{
itr->second();
timers.erase(id);
}
if(lock) delete lock;
}
uint32_t call_later(void * user, llarp_timer_handler_func func, uint64_t timeout_ms)
{
std::unique_lock<std::mutex> lock(timersMutex);
uint32_t id = ids ++;
timers[id] = llarp::timer(timeout_ms);
return id;
}
void cancel_all()
{
std::unique_lock<std::mutex> lock(timersMutex);
std::list<uint32_t> ids;
for (auto & item : timers)
{
ids.push_back(item.first);
}
for(auto id : ids)
{
cancel(id, false);
}
}
};
extern "C" {
struct llarp_timer_context * llarp_init_timer()
{
return new llarp_timer_context;
}
uint32_t llarp_timer_call_later(struct llarp_timer_context * t, struct llarp_timeout_job job)
{
return t->call_later(job.user, job.handler, job.timeout);
}
void llarp_free_timer(struct llarp_timer_context ** t)
{
if(*t)
delete *t;
*t = nullptr;
}
void llarp_timer_stop(struct llarp_timer_context * t)
{
t->cancel_all();
t->stop();
}
void llarp_timer_cancel(struct llarp_timer_context * t, uint32_t id)
{
t->cancel(id);
}
void llarp_timer_run(struct llarp_timer_context * t, struct llarp_threadpool * pool)
{
std::unique_lock<std::mutex> lock(t->tickerMutex);
while(t->run())
{
auto status = t->ticker.wait_for(lock, t->nextTickLen);
if(status == std::cv_status::no_timeout)
{
// we woke up
}
}
}
}