add single threaded mode for shadow

This commit is contained in:
Jeff Becker 2018-06-06 08:46:26 -04:00
parent 4fd0ef6984
commit 0278ba559c
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
31 changed files with 570 additions and 248 deletions

View File

@ -1,10 +1,10 @@
cmake_minimum_required(VERSION 2.8.10)
set(WITH_SHARED OFF)
set(DEBUG_FLAGS "")
set(DEBUG_FLAGS "-g")
set(OPTIMIZE_FLAGS "-Os")
if(ASAN)
set(DEBUG_FLAGS "${DEBUG_FLAGS} -g -fsanitize=address -fno-omit-frame-pointer")
set(DEBUG_FLAGS "${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer")
set(OPTIMIZE_FLAGS "-O0")
endif(ASAN)

View File

@ -13,17 +13,11 @@ clean:
rm -f *.sig
debug-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DASAN=true
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug
release-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Release -DRELEASE_MOTTO="$(shell cat motto.txt)"
configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug
build: configure
ninja
debug: debug-configure
ninja

View File

@ -11,10 +11,11 @@ namespace llarp
{
struct Context
{
Context(std::ostream &stdout);
Context(std::ostream &stdout, bool signleThread = false);
~Context();
int num_nethreads = 1;
int num_nethreads = 1;
bool singleThreaded = false;
std::vector< std::thread > netio_threads;
llarp_crypto crypto;
llarp_router *router = nullptr;

View File

@ -18,22 +18,17 @@ namespace llarp
AlignedBuffer(const byte_t* data)
{
for(size_t idx = 0; idx < sz; ++idx)
buf.b[idx] = data[idx];
b[idx] = data[idx];
}
AlignedBuffer&
operator=(const byte_t* data)
{
for(size_t idx = 0; idx < sz; ++idx)
buf.b[idx] = data[idx];
b[idx] = data[idx];
return *this;
}
byte_t& operator[](size_t idx)
{
return buf.b[idx];
}
friend std::ostream&
operator<<(std::ostream& out, const AlignedBuffer& self)
{
@ -41,7 +36,7 @@ namespace llarp
out << std::hex << std::setw(2) << std::setfill('0');
while(idx < sz)
{
out << (int)self.buf.b[idx++];
out << (int)self.b[idx++];
}
return out << std::dec << std::setw(0) << std::setfill(' ');
}
@ -68,54 +63,54 @@ namespace llarp
Zero()
{
for(size_t idx = 0; sz < idx / 8; ++idx)
buf.l[idx] = 0;
l[idx] = 0;
}
void
Randomize()
{
randombytes(buf.b, sz);
randombytes(l, sz);
}
byte_t*
data()
{
return &buf.b[0];
return &b[0];
}
const byte_t*
data() const
{
return &buf.b[0];
return &b[0];
}
uint64_t*
data_l()
{
return &buf.l[0];
return &l[0];
}
const uint64_t*
data_l() const
{
return &buf.l[0];
return &l[0];
}
operator const byte_t*() const
{
return &buf.b[0];
return &b[0];
}
operator byte_t*()
{
return &buf.b[0];
return &b[0];
}
private:
union {
byte_t b[sz];
uint64_t l[sz / 8];
} buf;
};
};
}

View File

@ -169,13 +169,13 @@ struct iwp_async_frame
struct llarp_async_iwp *iwp;
void *user;
/// current session key
uint8_t *sessionkey;
byte_t *sessionkey;
/// size of the frame
size_t sz;
/// result handler
iwp_async_frame_hook hook;
/// memory holding the entire frame
uint8_t buf[1500];
byte_t buf[1500];
};
/// decrypt iwp frame asynchronously

View File

@ -17,6 +17,10 @@
extern "C" {
#endif
// forward declare
struct llarp_threadpool;
struct llarp_logic;
struct llarp_ev_loop;
/// allocator
@ -31,6 +35,11 @@ llarp_ev_loop_free(struct llarp_ev_loop **ev);
int
llarp_ev_loop_run(struct llarp_ev_loop *ev);
void
llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
struct llarp_threadpool *tp,
struct llarp_logic *logic);
/// stop event loop and wait for it to complete all jobs
void
llarp_ev_loop_stop(struct llarp_ev_loop *ev);

View File

@ -23,6 +23,7 @@ namespace llarp
RouterID remote = {};
uint64_t version = 0;
ILinkMessage() = default;
ILinkMessage(const RouterID& id);
virtual ~ILinkMessage(){};

View File

@ -52,7 +52,7 @@ namespace llarp
if(_glog.minlevel > lvl)
return;
std::stringstream ss("");
std::stringstream ss;
switch(lvl)
{
case eLogDebug:
@ -72,16 +72,17 @@ namespace llarp
ss << "[ERR] ";
break;
}
auto t = std::time(nullptr);
auto now = std::localtime(&t);
std::time_t t;
std::time(&t);
std::string tag = fname;
auto pos = tag.rfind('/');
if(pos != std::string::npos)
tag = tag.substr(pos + 1);
while(tag.size() % 8)
tag += " ";
ss << std::put_time(now, "%F %T") << " " << tag << "\t";
ss << std::put_time(std::localtime(&t), "%F %T") << " " << tag;
auto sz = tag.size() % 8;
while(sz--)
ss << " ";
ss << "\t";
LogAppend(ss, std::forward< TArgs >(args)...);
ss << (char)27 << "[0;0m";
_glog.out << ss.str() << std::endl;

View File

@ -12,6 +12,14 @@ struct llarp_logic;
struct llarp_logic*
llarp_init_logic();
/// single threaded mode logic event loop
struct llarp_logic*
llarp_init_single_process_logic(struct llarp_threadpool* tp);
/// single threaded tick
void
llarp_logic_tick(struct llarp_logic* logic);
void
llarp_free_logic(struct llarp_logic** logic);

View File

@ -4,24 +4,22 @@
namespace llarp
{
const std::size_t MAX_DISCARD_SIZE = 10000;
/// a dummy link message that is discarded
struct DiscardMessage : public ILinkMessage
{
std::vector< byte_t > Z;
~DiscardMessage()
{
}
byte_t pad[MAX_DISCARD_SIZE];
size_t sz = 0;
DiscardMessage(const RouterID& id) : ILinkMessage(id)
{
}
DiscardMessage(const RouterID& other, std::size_t padding)
: ILinkMessage(other)
DiscardMessage(std::size_t padding) : ILinkMessage()
{
Z.resize(padding);
std::fill(Z.begin(), Z.end(), 'z');
sz = padding;
memset(pad, 'z', sz);
}
virtual bool
@ -38,8 +36,10 @@ namespace llarp
{
if(!bencode_read_string(buf, &strbuf))
return false;
Z.resize(strbuf.sz);
memcpy(Z.data(), strbuf.base, strbuf.sz);
if(strbuf.sz > MAX_DISCARD_SIZE)
return false;
sz = strbuf.sz;
memcpy(pad, strbuf.base, sz);
return true;
}
return false;
@ -61,7 +61,7 @@ namespace llarp
if(!bencode_write_bytestring(buf, "z", 1))
return false;
if(!bencode_write_bytestring(buf, Z.data(), Z.size()))
if(!bencode_write_bytestring(buf, pad, sz))
return false;
return bencode_end(buf);
@ -71,7 +71,7 @@ namespace llarp
HandleMessage(llarp_router* router) const
{
(void)router;
llarp::Info("got discard message of size ", Z.size(), " bytes");
llarp::Info("got discard message of size ", sz, " bytes");
return true;
}
};

View File

@ -5,7 +5,7 @@ namespace llarp
{
struct LinkIntroMessage : public ILinkMessage
{
LinkIntroMessage(llarp_rc* rc) : ILinkMessage({}), RC(rc)
LinkIntroMessage(llarp_rc* rc) : ILinkMessage(), RC(rc)
{
}

View File

@ -8,6 +8,11 @@ struct llarp_threadpool;
struct llarp_threadpool *
llarp_init_threadpool(int workers, const char *name);
/// for single process mode
struct llarp_threadpool *
llarp_init_same_process_threadpool();
void
llarp_free_threadpool(struct llarp_threadpool **tp);
@ -20,8 +25,24 @@ struct llarp_thread_job
void *user;
/** called in threadpool worker thread */
llarp_thread_work_func work;
#ifdef __cplusplus
llarp_thread_job(void *u, llarp_thread_work_func w) : user(u), work(w)
{
}
llarp_thread_job() : user(nullptr), work(nullptr)
{
}
#endif
};
/// for single process mode
void
llarp_threadpool_tick(struct llarp_threadpool *tp);
void
llarp_threadpool_queue_job(struct llarp_threadpool *tp,
struct llarp_thread_job j);

View File

@ -39,6 +39,11 @@ llarp_timer_stop(struct llarp_timer_context *t);
void
llarp_timer_run(struct llarp_timer_context *t, struct llarp_threadpool *pool);
/// single threaded run timer, tick all timers
void
llarp_timer_tick_all(struct llarp_timer_context *t,
struct llarp_threadpool *pool);
void
llarp_free_timer(struct llarp_timer_context **t);

View File

@ -20,7 +20,7 @@ namespace llarp
StackBuffer(T& stack)
{
llarp_buffer_t buff;
buff.base = stack;
buff.base = &stack[0];
buff.cur = buff.base;
buff.sz = sizeof(stack);
return buff;

View File

@ -10,7 +10,8 @@
namespace llarp
{
Context::Context(std::ostream &stdout) : out(stdout)
Context::Context(std::ostream &stdout, bool singleThread)
: singleThreaded(singleThread), out(stdout)
{
llarp::Info(LLARP_VERSION, " ", LLARP_RELEASE_MOTTO);
}
@ -50,7 +51,7 @@ namespace llarp
Context *ctx = static_cast< Context * >(itr->user);
if(!strcmp(section, "router"))
{
if(!strcmp(key, "worker-threads"))
if(!strcmp(key, "worker-threads") && !ctx->singleThreaded)
{
int workers = atoi(val);
if(workers > 0 && ctx->worker == nullptr)
@ -63,6 +64,8 @@ namespace llarp
ctx->num_nethreads = atoi(val);
if(ctx->num_nethreads <= 0)
ctx->num_nethreads = 1;
if(ctx->singleThreaded)
ctx->num_nethreads = 0;
}
}
if(!strcmp(section, "netdb"))
@ -87,10 +90,20 @@ namespace llarp
if(llarp_nodedb_ensure_dir(nodedb_dir))
{
// ensure worker thread pool
if(!worker)
if(!worker && !singleThreaded)
worker = llarp_init_threadpool(2, "llarp-worker");
else if(singleThreaded)
{
llarp::Info("running in single threaded mode");
worker = llarp_init_same_process_threadpool();
}
// ensure netio thread
logic = llarp_init_logic();
if(singleThreaded)
{
logic = llarp_init_single_process_logic(worker);
}
else
logic = llarp_init_logic();
router = llarp_init_router(worker, mainloop, logic);
@ -103,22 +116,31 @@ namespace llarp
}
llarp_run_router(router, nodedb);
// run net io thread
auto netio = mainloop;
while(num_nethreads--)
if(singleThreaded)
{
netio_threads.emplace_back([netio]() { llarp_ev_loop_run(netio); });
llarp::Info("running mainloop");
llarp_ev_loop_run_single_process(mainloop, worker, logic);
}
else
{
auto netio = mainloop;
while(num_nethreads--)
{
netio_threads.emplace_back(
[netio]() { llarp_ev_loop_run(netio); });
#if(__APPLE__ && __MACH__)
#elif(__FreeBSD__)
pthread_set_name_np(netio_threads.back().native_handle(),
"llarp-netio");
pthread_set_name_np(netio_threads.back().native_handle(),
"llarp-netio");
#else
pthread_setname_np(netio_threads.back().native_handle(),
"llarp-netio");
pthread_setname_np(netio_threads.back().native_handle(),
"llarp-netio");
#endif
}
llarp::Info("running mainloop");
llarp_logic_mainloop(logic);
}
llarp::Info("Ready");
llarp_logic_mainloop(logic);
return 0;
}
else

View File

@ -322,6 +322,7 @@ namespace iwp
{
iwp_async_frame *frame = static_cast< iwp_async_frame * >(user);
frame->hook(frame);
delete frame;
}
void
@ -329,9 +330,9 @@ namespace iwp
{
iwp_async_frame *frame = static_cast< iwp_async_frame * >(user);
auto crypto = frame->iwp->crypto;
auto hmac = frame->buf;
auto nonce = frame->buf + 32;
auto body = frame->buf + 64;
byte_t *hmac = frame->buf;
byte_t *nonce = frame->buf + 32;
byte_t *body = frame->buf + 64;
llarp_sharedkey_t digest;
@ -350,7 +351,7 @@ namespace iwp
buf.sz = frame->sz - 64;
crypto->xchacha20(buf, frame->sessionkey, nonce);
// inform result
llarp_logic_queue_job(frame->iwp->logic, {user, &inform_frame_done});
llarp_logic_queue_job(frame->iwp->logic, {frame, &inform_frame_done});
}
void
@ -358,9 +359,9 @@ namespace iwp
{
iwp_async_frame *frame = static_cast< iwp_async_frame * >(user);
auto crypto = frame->iwp->crypto;
auto hmac = frame->buf;
auto nonce = frame->buf + 32;
auto body = frame->buf + 64;
byte_t *hmac = frame->buf;
byte_t *nonce = frame->buf + 32;
byte_t *body = frame->buf + 64;
llarp_buffer_t buf;
buf.base = body;
@ -376,8 +377,9 @@ namespace iwp
buf.cur = buf.base;
buf.sz = frame->sz - 32;
crypto->hmac(hmac, buf, frame->sessionkey);
// inform result
llarp_logic_queue_job(frame->iwp->logic, {user, &inform_frame_done});
// call result RIGHT HERE
frame->hook(frame);
delete frame;
}
}

View File

@ -434,6 +434,7 @@ namespace llarp
Context::CleanupTX()
{
auto now = llarp_time_now_ms();
llarp::Debug("DHT tick");
std::set< TXOwner > expired;
for(auto &item : pendingTX)
@ -447,10 +448,11 @@ namespace llarp
if(e.requester != ourKey)
{
// inform not found
auto msg = new llarp::DHTImmeidateMessage(e.requester);
msg->msgs.push_back(
llarp::DHTImmeidateMessage msg(e.requester);
msg.msgs.push_back(
new GotRouterMessage(e.requester, e.txid, nullptr));
router->SendToOrQueue(e.requester, {msg});
llarp::Info("DHT reply to ", e.requester);
router->SendTo(e.requester, &msg);
}
}

View File

@ -0,0 +1,9 @@
#include <llarp/messages/discard.hpp>
namespace llarp
{
DiscardMessage::~DiscardMessage()
{
llarp::Debug("~DiscardMessage");
}
}

View File

@ -1,14 +1,15 @@
#include <llarp/ev.h>
#include <llarp/logic.h>
#include "mem.hpp"
#ifdef __linux__
# include "ev_epoll.hpp"
#include "ev_epoll.hpp"
#endif
#if (__APPLE__ && __MACH__)
# include "ev_kqueue.hpp"
#if(__APPLE__ && __MACH__)
#include "ev_kqueue.hpp"
#endif
#ifdef __FreeBSD__
# include "ev_kqueue.hpp"
#include "ev_kqueue.hpp"
#endif
extern "C" {
@ -19,7 +20,7 @@ llarp_ev_loop_alloc(struct llarp_ev_loop **ev)
#ifdef __linux__
*ev = new llarp_epoll_loop;
#endif
#if (__APPLE__ && __MACH__)
#if(__APPLE__ && __MACH__)
*ev = new llarp_kqueue_loop;
#endif
#ifdef __FreeBSD__
@ -41,6 +42,20 @@ llarp_ev_loop_run(struct llarp_ev_loop *ev)
return ev->run();
}
void
llarp_ev_loop_run_single_process(struct llarp_ev_loop *ev,
struct llarp_threadpool *tp,
struct llarp_logic *logic)
{
while(true)
{
llarp_logic_tick(logic);
llarp_threadpool_tick(tp);
if(ev->tick() == -1)
return;
}
}
int
llarp_ev_add_udp(struct llarp_ev_loop *ev, struct llarp_udp_io *udp,
const struct sockaddr *src)

View File

@ -27,6 +27,10 @@ struct llarp_ev_loop
init() = 0;
virtual int
run() = 0;
virtual int
tick() = 0;
virtual void
stop() = 0;

View File

@ -100,6 +100,40 @@ struct llarp_epoll_loop : public llarp_ev_loop
return false;
}
int
tick()
{
epoll_event events[1024];
int result;
byte_t readbuf[2048];
result = epoll_wait(epollfd, events, 1024, 100);
if(result > 0)
{
int idx = 0;
while(idx < result)
{
// handle signalfd
if(events[idx].data.fd == pipefds[0])
{
llarp::Debug("exiting epoll loop");
return 0;
}
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(events[idx].events & EPOLLIN)
{
if(ev->read(readbuf, sizeof(readbuf)) == -1)
{
llarp::Debug("close ev");
close_ev(ev);
}
}
++idx;
}
}
return result;
}
int
run()
{

View File

@ -6,11 +6,10 @@
#if __FreeBSD__
// kqueue / kevent
//# include <sys/types.h> // already in net.h
# include <sys/event.h>
# include <sys/time.h>
#include <sys/event.h>
#include <sys/time.h>
#endif
//#include <sys/socket.h>
//#include <ifaddrs.h>
@ -73,7 +72,7 @@ namespace llarp
struct llarp_kqueue_loop : public llarp_ev_loop
{
int kqueuefd;
struct kevent change; /* event we want to monitor */
struct kevent change; /* event we want to monitor */
llarp_kqueue_loop() : kqueuefd(-1)
{
@ -86,12 +85,39 @@ struct llarp_kqueue_loop : public llarp_ev_loop
bool
init()
{
if (kqueuefd == -1) {
if(kqueuefd == -1)
{
kqueuefd = kqueue();
}
return kqueuefd != -1;
}
int
tick()
{
struct kevent events[1024];
int result;
byte_t readbuf[2048];
result = kevent(kqueuefd, NULL, 0, events, 1024, NULL);
// result: 0 is a timeout
if(result > 0)
{
int idx = 0;
while(idx < result)
{
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata);
if(ev->read(readbuf, sizeof(readbuf)) == -1)
{
llarp::Info(__FILE__, "close ev");
close_ev(ev);
delete ev;
}
++idx;
}
}
return result;
}
int
run()
{
@ -102,7 +128,7 @@ struct llarp_kqueue_loop : public llarp_ev_loop
{
result = kevent(kqueuefd, NULL, 0, events, 1024, NULL);
// result: 0 is a timeout
if (result > 0)
if(result > 0)
{
int idx = 0;
while(idx < result)
@ -191,7 +217,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop
llarp::udp_listener* listener = new llarp::udp_listener(fd, l);
EV_SET(&change, fd, EVFILT_READ, EV_ADD, 0, 0, listener);
if (kevent(kqueuefd, &change, 1, NULL, 0, NULL) == -1) {
if(kevent(kqueuefd, &change, 1, NULL, 0, NULL) == -1)
{
delete listener;
return false;
}

View File

@ -42,7 +42,33 @@ namespace iwp
eFRAG = 0x03
};
typedef std::vector< byte_t > sendbuf_t;
struct sendbuf_t
{
sendbuf_t(size_t s) : sz(s)
{
buf = new byte_t[s];
}
~sendbuf_t()
{
delete[] buf;
}
byte_t *buf;
size_t sz;
size_t
size() const
{
return sz;
}
byte_t *
data()
{
return buf;
}
};
enum header_flag
{
@ -107,15 +133,14 @@ namespace iwp
};
byte_t *
init_sendbuf(sendbuf_t &buf, msgtype t, uint16_t sz, uint8_t flags)
init_sendbuf(sendbuf_t *buf, msgtype t, uint16_t sz, uint8_t flags)
{
buf.resize(6 + sz);
frame_header hdr(buf.data());
frame_header hdr(buf->data());
hdr.version() = 0;
hdr.msgtype() = t;
hdr.setsize(sz);
buf[4] = 0;
buf[5] = flags;
buf->data()[4] = 0;
buf->data()[5] = flags;
return hdr.data();
}
@ -124,9 +149,7 @@ namespace iwp
{
byte_t buffer[48];
xmit()
{
}
xmit() = default;
xmit(byte_t *ptr)
{
@ -213,28 +236,25 @@ namespace iwp
}
};
typedef std::vector< uint8_t > fragment_t;
// forward declare
struct session;
struct server;
struct transit_message
{
session *parent = nullptr;
xmit msginfo;
std::bitset< 32 > status;
std::bitset< 32 > status = {};
std::map< uint8_t, fragment_t > frags;
typedef std::vector< byte_t > fragment_t;
std::unordered_map< byte_t, fragment_t > frags;
fragment_t lastfrag;
transit_message()
{
}
~transit_message()
void
clear()
{
frags.clear();
lastfrag.clear();
}
// calculate acked bitmask
@ -251,6 +271,13 @@ namespace iwp
return bitmask;
}
// outbound
transit_message(llarp_buffer_t buf, const byte_t *hash, uint64_t id,
uint16_t mtu = 1024)
{
put_message(buf, hash, id, mtu);
}
// inbound
transit_message(const xmit &x) : msginfo(x)
{
@ -264,11 +291,6 @@ namespace iwp
status.reset();
}
// outbound
transit_message(session *s) : parent(s)
{
}
/// ack packets based off a bitmask
void
ack(uint32_t bitmask)
@ -307,10 +329,9 @@ namespace iwp
void
generate_xmit(T &queue, byte_t flags = 0)
{
queue.emplace();
auto &xmitbuf = queue.back();
auto body_ptr = init_sendbuf(
xmitbuf, eXMIT, sizeof(msginfo.buffer) + lastfrag.size(), flags);
uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer);
queue.push(new sendbuf_t(sz + 6));
auto body_ptr = init_sendbuf(queue.back(), eXMIT, sz, flags);
memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer));
body_ptr += sizeof(msginfo.buffer);
memcpy(body_ptr, lastfrag.data(), lastfrag.size());
@ -326,9 +347,10 @@ namespace iwp
{
if(status.test(frag.first))
continue;
queue.emplace();
auto &fragbuf = queue.back();
auto body_ptr = init_sendbuf(fragbuf, eFRAG, 9 + fragsize, flags);
uint16_t sz = 9 + fragsize;
queue.push(new sendbuf_t(sz + 6));
auto body_ptr = init_sendbuf(queue.back(), eFRAG, sz, flags);
// TODO: assumes big endian
memcpy(body_ptr, &msgid, 8);
body_ptr[8] = frag.first;
memcpy(body_ptr + 9, frag.second.data(), fragsize);
@ -402,10 +424,10 @@ namespace iwp
uint64_t rxids = 0;
uint64_t txids = 0;
llarp_time_t lastEvent = 0;
std::map< uint64_t, transit_message * > rx;
std::map< uint64_t, transit_message * > tx;
std::unordered_map< uint64_t, transit_message * > rx;
std::unordered_map< uint64_t, transit_message * > tx;
typedef std::queue< sendbuf_t > sendqueue_t;
typedef std::queue< sendbuf_t * > sendqueue_t;
llarp_router *router = nullptr;
llarp_link_session *parent = nullptr;
@ -422,11 +444,13 @@ namespace iwp
void
clear()
{
for(auto &item : rx)
auto _rx = rx;
auto _tx = tx;
for(auto &item : _rx)
delete item.second;
for(auto &item : _tx)
delete item.second;
rx.clear();
for(auto &item : tx)
delete item.second;
tx.clear();
}
@ -437,16 +461,15 @@ namespace iwp
push_ackfor(uint64_t id, uint32_t bitmask)
{
llarp::Debug("ACK for msgid=", id, " mask=", bitmask);
sendqueue.emplace();
auto &buf = sendqueue.back();
init_sendbuf(buf, eACKS, 12, txflags);
sendqueue.push(new sendbuf_t(12 + 6));
auto body_ptr = init_sendbuf(sendqueue.back(), eACKS, 12, txflags);
// TODO: this assumes big endian
memcpy(buf.data() + 6, &id, 8);
memcpy(buf.data() + 14, &bitmask, 4);
memcpy(body_ptr, &id, 8);
memcpy(body_ptr + 8, &bitmask, 4);
}
bool
got_xmit(frame_header &hdr, size_t sz)
got_xmit(frame_header hdr, size_t sz)
{
if(hdr.size() > sz)
{
@ -504,7 +527,7 @@ namespace iwp
}
bool
got_frag(frame_header &hdr, size_t sz)
got_frag(frame_header hdr, size_t sz)
{
if(hdr.size() > sz)
{
@ -561,33 +584,37 @@ namespace iwp
}
bool
got_acks(frame_header &hdr, size_t sz);
got_acks(frame_header hdr, size_t sz);
// queue new outbound message
void
queue_tx(uint64_t id, transit_message *msg)
{
auto itr = tx.emplace(id, msg);
if(itr.second)
tx[id] = msg;
msg->generate_xmit(sendqueue, txflags);
}
void
retransmit()
{
for(auto &item : tx)
{
msg->generate_xmit(sendqueue, txflags);
item.second->retransmit_frags(sendqueue, txflags);
}
else // duplicate
delete msg;
}
// get next frame to encrypt and transmit
bool
next_frame(llarp_buffer_t &buf)
next_frame(llarp_buffer_t *buf)
{
auto left = sendqueue.size();
llarp::Debug("next frame, ", left, " frames left in send queue");
if(left)
{
auto &send = sendqueue.front();
buf.base = &send[0];
buf.cur = &send[0];
buf.sz = send.size();
sendbuf_t *send = sendqueue.front();
buf->base = send->data();
buf->cur = send->data();
buf->sz = send->size();
return true;
}
return false;
@ -596,11 +623,13 @@ namespace iwp
void
pop_next_frame()
{
sendbuf_t *buf = sendqueue.front();
sendqueue.pop();
delete buf;
}
bool
process(uint8_t *buf, size_t sz)
process(byte_t *buf, size_t sz)
{
frame_header hdr(buf);
if(hdr.flags() & eSessionInvalidated)
@ -703,12 +732,11 @@ namespace iwp
static bool
sendto(llarp_link_session *s, llarp_buffer_t msg)
{
session *self = static_cast< session * >(s->impl);
transit_message *m = new transit_message(self);
auto id = self->frame.txids++;
session *self = static_cast< session * >(s->impl);
auto id = self->frame.txids++;
llarp_shorthash_t digest;
self->crypto->shorthash(digest, msg);
m->put_message(msg, digest, id);
transit_message *m = new transit_message(msg, digest, id);
self->add_outbound_message(id, m);
return true;
}
@ -731,7 +759,7 @@ namespace iwp
pump()
{
llarp_buffer_t buf;
while(frame.next_frame(buf))
while(frame.next_frame(&buf))
{
encrypt_frame_async_send(buf.base, buf.sz);
frame.pop_next_frame();
@ -794,14 +822,12 @@ namespace iwp
if(llarp::EncodeLIM(&buf, our_router))
{
// rewind message buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
auto msg = new transit_message;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// hash message buffer
crypto->shorthash(digest, buf);
// put message buffer
auto id = frame.txids++;
msg->put_message(buf, digest, id);
auto id = frame.txids++;
auto msg = new transit_message(buf, digest, id);
// put into outbound send queue
add_outbound_message(id, msg);
EnterState(eLIMSent);
@ -836,7 +862,11 @@ namespace iwp
}
// send keepalive if we are established or a session is made
if(state == eEstablished || state == eLIMSent)
{
send_keepalive(this);
frame.retransmit();
pump();
}
// TODO: determine if we are too idle
return false;
@ -966,6 +996,7 @@ namespace iwp
{
session *self = static_cast< session * >(frame->user);
llarp::Debug("rx ", frame->sz, " frames=", self->frames);
self->frames--;
if(frame->success)
{
if(self->frame.process(frame->buf + 64, frame->sz - 64))
@ -978,9 +1009,6 @@ namespace iwp
}
else
llarp::Error("decrypt frame fail");
delete frame;
--self->frames;
}
void
@ -988,8 +1016,8 @@ namespace iwp
{
if(sz > 64)
{
auto frame = alloc_frame(buf, sz);
frame->hook = &handle_frame_decrypt;
iwp_async_frame *frame = alloc_frame(buf, sz);
frame->hook = &handle_frame_decrypt;
iwp_call_async_frame_decrypt(iwp, frame);
}
else
@ -1002,8 +1030,7 @@ namespace iwp
session *self = static_cast< session * >(frame->user);
llarp::Debug("tx ", frame->sz, " frames=", self->frames);
llarp_ev_udp_sendto(self->udp, self->addr, frame->buf, frame->sz);
delete frame;
--self->frames;
self->frames--;
}
iwp_async_frame *
@ -1013,13 +1040,13 @@ namespace iwp
if(sz > 1500)
return nullptr;
iwp_async_frame *frame = new iwp_async_frame;
iwp_async_frame *frame = new iwp_async_frame();
if(buf)
memcpy(frame->buf, buf, sz);
frame->sz = sz;
frame->user = this;
frame->sessionkey = sessionkey;
++frames;
frames++;
return frame;
}
@ -1027,8 +1054,12 @@ namespace iwp
encrypt_frame_async_send(const void *buf, size_t sz)
{
// 64 bytes frame overhead for nonce and hmac
auto frame = alloc_frame(nullptr, sz + 64);
iwp_async_frame *frame = alloc_frame(nullptr, sz + 64);
memcpy(frame->buf + 64, buf, sz);
auto padding = rand() % MAX_PAD;
if(padding)
crypto->randbytes(frame->buf + 64 + sz, padding);
frame->sz += padding;
frame->hook = &handle_frame_encrypt;
iwp_call_async_frame_encrypt(iwp, frame);
}
@ -1210,9 +1241,7 @@ namespace iwp
char keyfile[255];
uint32_t timeout_job_id;
typedef std::unordered_map< llarp::Addr, llarp_link_session,
llarp::addrhash >
LinkMap_t;
typedef std::map< llarp::Addr, llarp_link_session > LinkMap_t;
LinkMap_t m_sessions;
mtx_t m_sessions_Mutex;
@ -1251,8 +1280,9 @@ namespace iwp
HasSessionToRouter(llarp_link *l, const byte_t *pubkey)
{
server *serv = static_cast< server * >(l->impl);
llarp::pubkey pk(pubkey);
lock_t lock(serv->m_Connected_Mutex);
return serv->m_Connected.find(pubkey) != serv->m_Connected.end();
return serv->m_Connected.find(pk) != serv->m_Connected.end();
}
void
@ -1262,12 +1292,11 @@ namespace iwp
{
lock_t lock(m_sessions_Mutex);
std::set< llarp::Addr > remove;
auto itr = m_sessions.begin();
while(itr != m_sessions.end())
for(auto &itr : m_sessions)
{
if(static_cast< session * >(itr->second.impl)->Tick(now))
remove.insert(itr->first);
++itr;
session *s = static_cast< session * >(itr.second.impl);
if(s && s->Tick(now))
remove.insert(itr.first);
}
for(const auto &addr : remove)
@ -1288,7 +1317,7 @@ namespace iwp
auto inner_itr = serv->m_sessions.find(itr->second);
if(inner_itr != serv->m_sessions.end())
{
auto link = &inner_itr->second;
llarp_link_session *link = &inner_itr->second;
return link->sendto(link, buf);
}
}
@ -1550,22 +1579,17 @@ namespace iwp
return;
}
// all zeros means keepalive
byte_t tmp[MAX_PAD + 8] = {0};
byte_t tmp[8] = {0};
// set flags for tx
frame_header hdr(tmp);
hdr.flags() = self->frame.txflags;
// 8 bytes iwp header overhead
int padsz = rand() % (sizeof(tmp) - 8);
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(padsz)
self->crypto->randbytes(buf.base + 8, padsz);
buf.sz -= padsz;
// send frame after encrypting
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
self->encrypt_frame_async_send(buf.base, buf.sz);
}
bool
frame_state::got_acks(frame_header &hdr, size_t sz)
frame_state::got_acks(frame_header hdr, size_t sz)
{
if(hdr.size() > sz)
{
@ -1592,24 +1616,26 @@ namespace iwp
return false;
}
itr->second->ack(bitmask);
transit_message *msg = itr->second;
if(itr->second->completed())
msg->ack(bitmask);
if(msg->completed())
{
llarp::Debug("message transmitted msgid=", msgid);
delete itr->second;
tx.erase(itr);
session *impl = static_cast< session * >(parent->impl);
if(impl->state == session::eLIMSent && msgid == 0)
{
// first message acked we are established?
impl->session_established();
}
tx.erase(msgid);
delete msg;
}
else
{
llarp::Debug("message ", msgid, " retransmit fragments");
itr->second->retransmit_frags(sendqueue);
msg->retransmit_frags(sendqueue, txflags);
}
return true;
@ -1721,7 +1747,7 @@ namespace iwp
link->timeout_job_id = 0;
link->logic = logic;
// start cleanup timer
link->issue_cleanup_timer(1000);
link->issue_cleanup_timer(2500);
return true;
}
@ -1739,11 +1765,17 @@ namespace iwp
link_iter_sessions(struct llarp_link *l, struct llarp_link_session_iter iter)
{
server *link = static_cast< server * >(l->impl);
iter.link = l;
// TODO: race condition with cleanup timer
for(auto &item : link->m_sessions)
if(!iter.visit(&iter, &item.second))
return;
auto sz = link->m_sessions.size();
if(sz)
{
llarp::Debug("we have ", sz, "sessions");
iter.link = l;
// TODO: race condition with cleanup timer
for(auto &item : link->m_sessions)
if(item.second.impl)
if(!iter.visit(&iter, &item.second))
return;
}
}
bool

View File

@ -22,6 +22,24 @@ llarp_init_logic()
return logic;
};
struct llarp_logic*
llarp_init_single_process_logic(struct llarp_threadpool* tp)
{
llarp_logic* logic = new llarp_logic;
if(logic)
{
logic->thread = tp;
logic->timer = llarp_init_timer();
}
return logic;
}
void
llarp_logic_tick(struct llarp_logic* logic)
{
llarp_timer_tick_all(logic->timer, logic->thread);
}
void
llarp_free_logic(struct llarp_logic** logic)
{
@ -58,13 +76,20 @@ llarp_logic_mainloop(struct llarp_logic* logic)
void
llarp_logic_queue_job(struct llarp_logic* logic, struct llarp_thread_job job)
{
llarp_threadpool_queue_job(logic->thread, job);
llarp_thread_job j;
j.user = job.user;
j.work = job.work;
llarp_threadpool_queue_job(logic->thread, j);
}
uint32_t
llarp_logic_call_later(struct llarp_logic* logic, struct llarp_timeout_job job)
{
return llarp_timer_call_later(logic->timer, job);
llarp_timeout_job j;
j.user = job.user;
j.timeout = job.timeout;
j.handler = job.handler;
return llarp_timer_call_later(logic->timer, j);
}
void

View File

@ -178,6 +178,21 @@ namespace llarp
return ntohs(_addr.sin6_port);
}
bool
operator<(const Addr& other) const
{
int a = af();
if(a == other.af())
{
if(a == AF_INET)
{
return port() < other.port() && memcmp(addr4(), other.addr4(), 4) < 0;
}
}
return af() < other.af() && port() < other.port()
&& memcmp(addr6(), other.addr6(), 16) < 0;
}
bool
operator==(const Addr& other) const
{

View File

@ -281,11 +281,14 @@ void
llarp_router::Tick()
{
llarp::Debug("tick router");
llarp_link_session_iter iter;
iter.user = this;
iter.visit = &send_padded_message;
if(sendPadding)
{
for(auto &link : links)
for(auto link : links)
{
link->iter_sessions(link, {this, nullptr, &send_padded_message});
link->iter_sessions(link, iter);
}
}
}
@ -294,12 +297,43 @@ bool
llarp_router::send_padded_message(llarp_link_session_iter *itr,
llarp_link_session *peer)
{
auto msg = new llarp::DiscardMessage({}, 4096);
llarp_router *self = static_cast< llarp_router * >(itr->user);
self->SendToOrQueue(peer->get_remote_router(peer)->pubkey, {msg});
llarp::RouterID remote;
remote = &peer->get_remote_router(peer)->pubkey[0];
for(size_t idx = 0; idx < 50; ++idx)
{
llarp::DiscardMessage msg(9000);
self->SendTo(remote, &msg);
}
return true;
}
void
llarp_router::SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg)
{
llarp_buffer_t buf =
llarp::StackBuffer< decltype(linkmsg_buffer) >(linkmsg_buffer);
if(!msg->BEncode(&buf))
{
llarp::Warn("failed to encode outbound message, buffer size left: ",
llarp_buffer_size_left(buf));
return;
}
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
bool sent = false;
for(auto link : links)
{
if(!sent)
{
sent = link->sendto(link, remote, buf);
}
}
}
void
llarp_router::ScheduleTicker(uint64_t ms)
{
@ -322,6 +356,7 @@ llarp_router::SessionClosed(const llarp::RouterID &remote)
void
llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
{
llarp::Debug("Flush outbound for ", remote);
auto itr = outboundMesssageQueue.find(remote);
if(itr == outboundMesssageQueue.end())
return;
@ -343,12 +378,13 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
bool sent = false;
llarp::RouterID peer = remote;
bool sent = false;
for(auto &link : links)
{
if(!sent)
{
sent = link->sendto(link, remote, buf);
sent = link->sendto(link, peer, buf);
}
}
if(!sent)
@ -366,8 +402,11 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
llarp_router *router = static_cast< llarp_router * >(job->user);
if(job->session)
{
delete job;
/*
auto session = job->session;
router->async_verify_RC(session, false, job);
*/
return;
}
llarp::Info("session not established");

View File

@ -119,6 +119,10 @@ struct llarp_router
SendToOrQueue(const llarp::RouterID &remote,
std::vector< llarp::ILinkMessage * > msgs);
/// sendto or drop
void
SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg);
/// manually flush outbound message queue for just 1 router
void
FlushOutboundFor(const llarp::RouterID &remote);

View File

@ -1,6 +1,10 @@
#include "threadpool.hpp"
#include <pthread.h>
#include <cstring>
#include <llarp/time.h>
#include <queue>
#include "logger.hpp"
#if(__FreeBSD__)
@ -29,18 +33,24 @@ namespace llarp
}
for(;;)
{
llarp_thread_job job;
llarp_thread_job *job;
{
lock_t lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->jobs.empty(); });
if(this->stop && this->jobs.empty())
return;
job = std::move(this->jobs.front());
job = this->jobs.front();
this->jobs.pop_front();
}
auto now = llarp_time_now_ms();
// do work
job.work(job.user);
job->work(job->user);
auto after = llarp_time_now_ms();
auto dlt = after - now;
if(dlt > 1)
llarp::Warn("work took ", dlt, " ms");
delete job;
}
});
}
@ -75,7 +85,7 @@ namespace llarp
if(stop)
return;
jobs.emplace_back(job);
jobs.push_back(new llarp_thread_job(job.user, job.work));
}
condition.notify_one();
}
@ -85,9 +95,16 @@ namespace llarp
struct llarp_threadpool
{
llarp::thread::Pool impl;
llarp::thread::Pool *impl;
llarp_threadpool(int workers, const char *name) : impl(workers, name)
std::queue< llarp_thread_job > jobs;
llarp_threadpool(int workers, const char *name)
: impl(new llarp::thread::Pool(workers, name))
{
}
llarp_threadpool() : impl(nullptr)
{
}
};
@ -103,11 +120,18 @@ llarp_init_threadpool(int workers, const char *name)
return nullptr;
}
struct llarp_threadpool *
llarp_init_same_process_threadpool()
{
return new llarp_threadpool();
}
void
llarp_threadpool_join(struct llarp_threadpool *pool)
{
llarp::Debug("threadpool join");
pool->impl.Join();
if(pool->impl)
pool->impl->Join();
}
void
@ -119,7 +143,8 @@ void
llarp_threadpool_stop(struct llarp_threadpool *pool)
{
llarp::Debug("threadpool stop");
pool->impl.Stop();
if(pool->impl)
pool->impl->Stop();
}
void
@ -127,9 +152,10 @@ llarp_threadpool_wait(struct llarp_threadpool *pool)
{
std::mutex mtx;
llarp::Debug("threadpool wait");
if(pool->impl)
{
std::unique_lock< std::mutex > lock(mtx);
pool->impl.done.wait(lock);
pool->impl->done.wait(lock);
}
}
@ -137,7 +163,21 @@ void
llarp_threadpool_queue_job(struct llarp_threadpool *pool,
struct llarp_thread_job job)
{
pool->impl.QueueJob(job);
if(pool->impl)
pool->impl->QueueJob(job);
else
pool->jobs.push(job);
}
void
llarp_threadpool_tick(struct llarp_threadpool *pool)
{
while(pool->jobs.size())
{
auto &job = pool->jobs.front();
job.work(job.user);
pool->jobs.pop();
}
}
void

View File

@ -25,7 +25,7 @@ namespace llarp
void
Stop();
std::vector< std::thread > threads;
std::deque< llarp_thread_job > jobs;
std::deque< llarp_thread_job* > jobs;
mtx_t queue_mutex;
std::condition_variable condition;

View File

@ -3,7 +3,7 @@
namespace llarp
{
typedef std::chrono::steady_clock clock_t;
typedef std::chrono::system_clock clock_t;
template < typename Res, typename IntType >
static IntType

View File

@ -54,16 +54,21 @@ namespace llarp
struct llarp_timer_context
{
llarp_threadpool* threadpool;
std::mutex timersMutex;
std::unordered_map< uint32_t, llarp::timer* > timers;
std::mutex tickerMutex;
std::condition_variable ticker;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(10);
std::condition_variable* ticker = nullptr;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100);
uint32_t ids = 0;
bool _run = true;
~llarp_timer_context()
{
if(ticker)
delete ticker;
}
bool
run()
{
@ -102,7 +107,7 @@ struct llarp_timer_context
{
std::unique_lock< std::mutex > lock(timersMutex);
uint32_t id = ++ids;
timers.emplace(id, new llarp::timer(timeout_ms, user, func));
timers[id] = new llarp::timer(timeout_ms, user, func);
return id;
}
@ -163,7 +168,8 @@ llarp_timer_stop(struct llarp_timer_context* t)
// don't call callbacks on timers
t->timers.clear();
t->stop();
t->ticker.notify_all();
if(t->ticker)
t->ticker->notify_all();
}
void
@ -172,48 +178,59 @@ llarp_timer_cancel_job(struct llarp_timer_context* t, uint32_t id)
t->cancel(id);
}
void
llarp_timer_tick_all(struct llarp_timer_context* t,
struct llarp_threadpool* pool)
{
if(!t->run())
return;
auto now = llarp_time_now_ms();
auto itr = t->timers.begin();
while(itr != t->timers.end())
{
if(now - itr->second->started >= itr->second->timeout
|| itr->second->canceled)
{
if(itr->second->func && itr->second->called_at == 0)
{
// timer hit
itr->second->called_at = now;
itr->second->send_job(pool);
++itr;
}
else if(itr->second->done)
{
// remove timer
llarp::timer* timer = itr->second;
itr = t->timers.erase(itr);
delete timer;
}
else
++itr;
}
else // timer not hit yet
++itr;
}
}
void
llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool)
{
t->threadpool = pool;
t->ticker = new std::condition_variable;
while(t->run())
{
// wait for timer mutex
if(t->ticker)
{
std::unique_lock< std::mutex > lock(t->tickerMutex);
t->ticker.wait_for(lock, t->nextTickLen);
t->ticker->wait_for(lock, t->nextTickLen);
}
if(t->run())
{
std::unique_lock< std::mutex > lock(t->timersMutex);
// we woke up
auto now = llarp_time_now_ms();
auto itr = t->timers.begin();
while(itr != t->timers.end())
{
if(now - itr->second->started >= itr->second->timeout
|| itr->second->canceled)
{
if(itr->second->func && itr->second->called_at == 0)
{
// timer hit
itr->second->called_at = now;
itr->second->send_job(pool);
++itr;
}
else if(itr->second->done)
{
// remove timer
delete itr->second;
itr = t->timers.erase(itr);
}
else
++itr;
}
else // timer not hit yet
++itr;
}
llarp_timer_tick_all(t, pool);
}
}
}