mirror of https://github.com/oxen-io/lokinet
have logic and netio run in same thread for now
This commit is contained in:
parent
08b60a9ae7
commit
d011fb550e
|
@ -116,41 +116,11 @@ main(int argc, char *argv[])
|
|||
// Configure intercept
|
||||
dnsd.intercept = &hookChecker;
|
||||
|
||||
// singlethreaded
|
||||
if(0)
|
||||
{
|
||||
llarp::LogInfo("singlethread start");
|
||||
worker = llarp_init_same_process_threadpool();
|
||||
logic = llarp_init_single_process_logic(worker);
|
||||
llarp_ev_loop_run_single_process(netloop, worker, logic);
|
||||
llarp::LogInfo("singlethread end");
|
||||
}
|
||||
else
|
||||
{
|
||||
uint num_llarpworkers = 2;
|
||||
uint num_nethreads = 8;
|
||||
llarp::LogInfo("multithreaded start with ", num_llarpworkers,
|
||||
" llarp-workers and ", num_nethreads, " networkers");
|
||||
// create workers
|
||||
worker = llarp_init_threadpool(num_llarpworkers, "llarp-worker");
|
||||
logic = llarp_init_logic();
|
||||
auto netio = netloop;
|
||||
std::vector< std::thread > netio_threads;
|
||||
while(num_nethreads--)
|
||||
{
|
||||
netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); });
|
||||
#if(__APPLE__ && __MACH__)
|
||||
|
||||
#elif(__FreeBSD__) || (__OpenBSD__) || (__NetBSD__)
|
||||
pthread_set_name_np(netio_threads.back().native_handle(),
|
||||
"llarp-netio");
|
||||
#else
|
||||
pthread_setname_np(netio_threads.back().native_handle(), "llarp-netio");
|
||||
#endif
|
||||
}
|
||||
llarp_logic_mainloop(logic);
|
||||
llarp::LogInfo("multithreaded end");
|
||||
}
|
||||
llarp::LogInfo("singlethread start");
|
||||
worker = llarp_init_same_process_threadpool();
|
||||
logic = llarp_init_single_process_logic(worker);
|
||||
llarp_ev_loop_run_single_process(netloop, worker, logic);
|
||||
llarp::LogInfo("singlethread end");
|
||||
llarp_ev_loop_free(&netloop);
|
||||
}
|
||||
else
|
||||
|
|
|
@ -107,9 +107,13 @@ namespace llarp
|
|||
bool
|
||||
IsZero() const
|
||||
{
|
||||
AlignedBuffer< sz > b;
|
||||
b.Zero();
|
||||
return memcmp(l, b.l, sz) == 0;
|
||||
size_t idx = sz / 8;
|
||||
while(idx)
|
||||
{
|
||||
if(l[idx--])
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -33,7 +33,7 @@ llarp_ev_loop_free(struct llarp_ev_loop **ev);
|
|||
|
||||
/// run main loop
|
||||
int
|
||||
llarp_ev_loop_run(struct llarp_ev_loop *ev);
|
||||
llarp_ev_loop_run(struct llarp_ev_loop *ev, struct llarp_logic *logic);
|
||||
|
||||
void
|
||||
llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
|
||||
|
|
|
@ -10,12 +10,12 @@
|
|||
|
||||
struct llarp_link
|
||||
{
|
||||
/*
|
||||
typedef std::mutex mtx_t;
|
||||
typedef std::unique_lock< mtx_t > lock_t;
|
||||
/*
|
||||
*/
|
||||
typedef llarp::util::DummyMutex mtx_t;
|
||||
typedef llarp::util::DummyLock lock_t;
|
||||
*/
|
||||
|
||||
llarp_router *router;
|
||||
llarp_crypto *crypto;
|
||||
|
|
|
@ -187,22 +187,8 @@ namespace llarp
|
|||
}
|
||||
else
|
||||
{
|
||||
auto netio = mainloop;
|
||||
while(num_nethreads--)
|
||||
{
|
||||
netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); });
|
||||
#if(__APPLE__ && __MACH__)
|
||||
|
||||
#elif(__FreeBSD__) || (__OpenBSD__) || (__NetBSD__)
|
||||
pthread_set_name_np(netio_threads.back().native_handle(),
|
||||
"llarp-"
|
||||
"netio");
|
||||
#else
|
||||
pthread_setname_np(netio_threads.back().native_handle(), "llarp-netio");
|
||||
#endif
|
||||
}
|
||||
llarp::LogInfo("running mainloop");
|
||||
llarp_logic_mainloop(logic);
|
||||
return llarp_ev_loop_run(mainloop, logic);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
11
llarp/ev.cpp
11
llarp/ev.cpp
|
@ -35,9 +35,16 @@ llarp_ev_loop_free(struct llarp_ev_loop **ev)
|
|||
}
|
||||
|
||||
int
|
||||
llarp_ev_loop_run(struct llarp_ev_loop *ev)
|
||||
llarp_ev_loop_run(struct llarp_ev_loop *ev, struct llarp_logic *logic)
|
||||
{
|
||||
return ev->run();
|
||||
while(true)
|
||||
{
|
||||
if(ev->tick(10) == -1)
|
||||
break;
|
||||
llarp_logic_tick(logic);
|
||||
llarp_threadpool_tick(logic->thread);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -57,6 +57,7 @@ struct llarp_timer_context
|
|||
{
|
||||
std::mutex timersMutex;
|
||||
std::unordered_map< uint32_t, llarp::timer* > timers;
|
||||
std::priority_queue< llarp::timer* > calling;
|
||||
std::mutex tickerMutex;
|
||||
std::condition_variable* ticker = nullptr;
|
||||
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100);
|
||||
|
@ -182,13 +183,12 @@ typedef std::priority_queue< llarp::timer* > timers_t;
|
|||
static void
|
||||
call_timers(void* user)
|
||||
{
|
||||
timers_t* t = static_cast< timers_t* >(user);
|
||||
while(t->size())
|
||||
llarp_timer_context* t = static_cast< llarp_timer_context* >(user);
|
||||
while(t->calling.size())
|
||||
{
|
||||
t->top()->exec();
|
||||
t->pop();
|
||||
t->calling.top()->exec();
|
||||
t->calling.pop();
|
||||
}
|
||||
delete t;
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -197,9 +197,8 @@ llarp_timer_tick_all(struct llarp_timer_context* t,
|
|||
{
|
||||
if(!t->run())
|
||||
return;
|
||||
auto now = llarp_time_now_ms();
|
||||
auto itr = t->timers.begin();
|
||||
timers_t* calling = new timers_t();
|
||||
auto now = llarp_time_now_ms();
|
||||
auto itr = t->timers.begin();
|
||||
while(itr != t->timers.end())
|
||||
{
|
||||
if(now - itr->second->started >= itr->second->timeout
|
||||
|
@ -209,26 +208,17 @@ llarp_timer_tick_all(struct llarp_timer_context* t,
|
|||
{
|
||||
// timer hit
|
||||
itr->second->called_at = now;
|
||||
calling->push(itr->second);
|
||||
++itr;
|
||||
}
|
||||
else if(itr->second->done)
|
||||
{
|
||||
// remove timer
|
||||
itr->second->exec();
|
||||
llarp::timer* timer = itr->second;
|
||||
itr = t->timers.erase(itr);
|
||||
delete timer;
|
||||
continue;
|
||||
}
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
else // timer not hit yet
|
||||
++itr;
|
||||
++itr;
|
||||
}
|
||||
if(calling->size())
|
||||
llarp_threadpool_queue_job(pool, {calling, &call_timers});
|
||||
else
|
||||
delete calling;
|
||||
if(t->calling.size())
|
||||
llarp_threadpool_queue_job(pool, {t, &call_timers});
|
||||
}
|
||||
|
||||
void
|
||||
|
|
Loading…
Reference in New Issue