it works.

This commit is contained in:
Jeff Becker 2018-08-12 13:22:29 -04:00
parent e79708c1dc
commit d7c1c3322f
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
24 changed files with 263 additions and 144 deletions

View File

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 2.8.10)
set(PROJECT_NAME llarpd)
set(PROJECT_NAME lokinet)
project(${PROJECT_NAME})
macro(add_cflags)
@ -122,7 +122,7 @@ if(RELEASE_MOTTO)
add_definitions(-DLLARP_RELEASE_MOTTO="${RELEASE_MOTTO}")
endif()
set(EXE llarpd)
set(EXE lokinet)
set(EXE_SRC daemon/main.cpp)
if(SODIUM_INCLUDE_DIR)

View File

@ -18,7 +18,7 @@ SHADOW_CONFIG=$(REPO)/shadow.config.xml
SHADOW_PLUGIN=$(REPO)/libshadow-plugin-llarp.so
SHADOW_LOG=$(REPO)/shadow.log.txt
SHADOW_SRC ?= $(HOME)/git/shadow
SHADOW_SRC ?= $(HOME)/local/shadow
SHADOW_PARSE ?= python $(SHADOW_SRC)/src/tools/parse-shadow.py - -m 0 --packet-data
SHADOW_PLOT ?= python $(SHADOW_SRC)/src/tools/plot-shadow.py -d $(REPO) LokiNET -c $(SHADOW_CONFIG) -r 10000 -e '.*'
@ -26,7 +26,9 @@ TESTNET_ROOT=/tmp/lokinet_testnet_tmp
TESTNET_CONF=$(TESTNET_ROOT)/supervisor.conf
TESTNET_LOG=$(TESTNET_ROOT)/testnet.log
TESTNET_EXE=$(REPO)/lokinet
EXE = $(REPO)/lokinet
TESTNET_EXE=$(REPO)/lokinet-testnet
TESTNET_CLIENTS ?= 50
TESTNET_SERVERS ?= 50
TESTNET_DEBUG ?= 0
@ -87,16 +89,14 @@ testnet-configure: testnet-clean
testnet-build: testnet-configure
ninja
$(TESTNET_EXE): testnet-build
cp -f $(REPO)/llarpd $(TESTNET_EXE)
shared-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DWITH_TESTS=ON -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DWITH_SHARED=ON -DTUNTAP=ON
shared: shared-configure
ninja
testnet: $(TESTNET_EXE)
testnet:
cp $(EXE) $(TESTNET_EXE)
mkdir -p $(TESTNET_ROOT)
python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
@ -107,12 +107,3 @@ test: debug-configure
format:
clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$')
fuzz-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Fuzz -DCMAKE_C_COMPILER=afl-gcc -DCMAKE_CXX_COMPILER=afl-g++
fuzz-build: fuzz-configure
ninja
fuzz: fuzz-build
$(EXE)

View File

@ -13,7 +13,7 @@ def getSetting(s, name, fallback): return name in s and s[name] or fallback
shadowRoot = getSetting(os.environ, "SHADOW_ROOT",
os.path.join(os.environ['HOME'], '.shadow'))
libpath = 'libshadow-plugin-llarp.so'
libpath = 'libshadow-plugin-lokinet.so'
def nodeconf(conf, baseDir, name, ifname=None, port=None):

View File

@ -39,5 +39,6 @@ main(int argc, char *argv[])
code = llarp_main_run(ctx);
llarp_main_free(ctx);
}
exit(code);
return code;
}

View File

@ -61,10 +61,12 @@ typedef struct llarp_buffer_t
/// max size of buffer
size_t sz;
#ifdef __cplusplus
const byte_t operator[](size_t x)
{
return *(this->base + x);
}
#endif
} llarp_buffer_t;
/// how much room is left in buffer

View File

@ -51,8 +51,7 @@ namespace llarp
};
template < typename T, typename GetTime, typename PutTime, typename Compare,
typename Mutex_t = std::mutex,
typename Lock_t = std::lock_guard< std::mutex >,
typename Mutex_t = util::Mutex, typename Lock_t = util::Lock,
llarp_time_t dropMs = 5, llarp_time_t initialIntervalMs = 100 >
struct CoDelQueue
{

View File

@ -10,12 +10,8 @@
struct llarp_link
{
/*
typedef std::mutex mtx_t;
typedef std::unique_lock< mtx_t > lock_t;
*/
typedef llarp::util::DummyMutex mtx_t;
typedef llarp::util::DummyLock lock_t;
typedef llarp::util::NullMutex mtx_t;
typedef llarp::util::NullLock lock_t;
llarp_router *router;
llarp_crypto *crypto;

View File

@ -125,10 +125,7 @@ struct llarp_link_session
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime,
FrameCompareTime >
outboundFrames;
/*
std::mutex m_EncryptedFramesMutex;
std::queue< iwp_async_frame > encryptedFrames;
*/
llarp::util::CoDelQueue< iwp_async_frame *, FrameGetTime, FramePutTime,
FrameCompareTime >
decryptedFrames;

View File

@ -31,7 +31,7 @@ namespace llarp
std::string nodeName;
LogLevel minlevel = eLogInfo;
std::ostream& out;
std::mutex access;
llarp::util::Mutex access;
Logger() : Logger(std::cout, "unnamed")
{
#ifdef _WIN32
@ -80,19 +80,24 @@ namespace llarp
std::stringstream ss;
#ifdef ANDROID
int loglev = -1;
switch(lvl)
{
case eLogDebug:
ss << "[DBG] ";
loglev = ANDROID_LOG_DEBUG;
break;
case eLogInfo:
ss << "[NFO] ";
loglev = ANDROID_LOG_INFO;
break;
case eLogWarn:
ss << "[WRN] ";
loglev = ANDROID_LOG_WARN;
break;
case eLogError:
ss << "[ERR] ";
loglev = ANDROID_LOG_ERROR;
break;
}
#else
@ -125,14 +130,12 @@ namespace llarp
ss << (char)27 << "[0;0m";
#endif
{
std::unique_lock< std::mutex > lock(_glog.access);
#ifdef ANDROID
__android_log_write(ANDROID_LOG_INFO, "LOKINET", ss.str().c_str());
tag = "LOKINET|" + tag;
__android_log_write(ANDROID_LOG_INFO, tag.c_str(), ss.str().c_str());
#else
llarp::util::Lock lock(_glog.access);
_glog.out << ss.str() << std::endl;
#ifdef SHADOW_TESTNET
_glog.out << "\n" << std::flush;
#endif
#endif
}
}

View File

@ -13,11 +13,16 @@ namespace llarp
struct PathTransferMessage : public IMessage
{
PathID_t P;
service::ProtocolFrame* T = nullptr;
uint64_t V = 0;
service::ProtocolFrame T;
uint64_t V = 0;
TunnelNonce Y;
PathTransferMessage();
PathTransferMessage(const service::ProtocolFrame& f, const PathID_t& p)
: P(p), T(f)
{
Y.Randomize();
}
~PathTransferMessage();
bool

View File

@ -379,12 +379,12 @@ namespace llarp
typedef std::multimap< PathID_t, TransitHop* > TransitHopsMap_t;
typedef std::pair< std::mutex, TransitHopsMap_t > SyncTransitMap_t;
typedef std::pair< util::Mutex, TransitHopsMap_t > SyncTransitMap_t;
// maps path id -> pathset owner of path
typedef std::map< PathID_t, PathSet* > OwnedPathsMap_t;
typedef std::pair< std::mutex, OwnedPathsMap_t > SyncOwnedPathsMap_t;
typedef std::pair< util::Mutex, OwnedPathsMap_t > SyncOwnedPathsMap_t;
llarp_threadpool*
Worker();

View File

@ -24,6 +24,9 @@ struct llarp_pathbuilder_context : public llarp::path::PathSet
void
BuildOne();
void
ManualRebuild(size_t N);
virtual byte_t*
GetTunnelEncryptionSecretKey();
};

View File

@ -9,4 +9,113 @@
#include <condition_variable>
#include <thread>
#endif
#include <memory>
namespace llarp
{
namespace util
{
/// a mutex that does nothing
struct NullMutex
{
};
/// a lock that does nothing
struct NullLock
{
NullLock(NullMutex& mtx)
{
}
};
/// a condition variable that does nothing
struct NullCondition
{
void
wait(NullLock& l)
{
}
void
notify_one()
{
}
void
notify_all()
{
}
template < typename Interval >
void
wait_for(NullLock& l, Interval i)
{
std::this_thread::sleep_for(i);
}
};
#ifdef SHADOW_TESTNET
typedef NullMutex mtx_t;
typedef NullLock lock_t;
typedef NullCondition cond_t;
#else
typedef std::mutex mtx_t;
typedef std::unique_lock< std::mutex > lock_t;
typedef std::condition_variable cond_t;
#endif
struct Mutex
{
mtx_t impl;
};
/// aqcuire a lock on a mutex
struct Lock
{
Lock(Mutex& mtx) : impl(mtx.impl)
{
}
lock_t impl;
};
struct Condition
{
cond_t impl;
void
NotifyAll()
{
impl.notify_all();
}
void
NotifyOne()
{
impl.notify_one();
}
void
Wait(Lock& lock)
{
impl.wait(lock.impl);
}
template < typename Interval >
void
WaitFor(Lock& lock, Interval i)
{
impl.wait_for(lock.impl, i);
}
template < typename Pred >
void
WaitUntil(Lock& lock, Pred p)
{
impl.wait(lock.impl, p);
}
};
} // namespace util
} // namespace llarp
#endif

View File

@ -1,9 +0,0 @@
#include <llarp/messages/discard.hpp>
namespace llarp
{
DiscardMessage::~DiscardMessage()
{
llarp::LogDebug("~DiscardMessage");
}
} // namespace llarp

View File

@ -78,7 +78,7 @@ namespace llarp
IHopHandler*
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
std::unique_lock< std::mutex > lock(map.first);
util::Lock lock(map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -92,7 +92,7 @@ namespace llarp
bool
MapHas(Map_t& map, const Key_t& k, CheckValue_t check)
{
std::unique_lock< std::mutex > lock(map.first);
util::Lock lock(map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -106,7 +106,7 @@ namespace llarp
void
MapPut(Map_t& map, const Key_t& k, const Value_t& v)
{
std::unique_lock< std::mutex > lock(map.first);
util::Lock lock(map.first);
map.second.insert(std::make_pair(k, v));
}
@ -114,7 +114,7 @@ namespace llarp
void
MapIter(Map_t& map, Visit_t v)
{
std::unique_lock< std::mutex > lock(map.first);
util::Lock lock(map.first);
for(const auto& item : map.second)
v(item);
}
@ -123,7 +123,7 @@ namespace llarp
void
MapDel(Map_t& map, const Key_t& k, Check_t check)
{
std::unique_lock< std::mutex > lock(map.first);
util::Lock lock(map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second;)
{
@ -185,7 +185,7 @@ namespace llarp
PathContext::GetLocalPathSet(const PathID_t& id)
{
auto& map = m_OurPaths;
std::unique_lock< std::mutex > lock(map.first);
util::Lock lock(map.first);
auto itr = map.second.find(id);
if(itr != map.second.end())
{
@ -216,7 +216,7 @@ namespace llarp
void
PathContext::ExpirePaths()
{
std::unique_lock< std::mutex > lock(m_TransitPaths.first);
util::Lock lock(m_TransitPaths.first);
auto now = llarp_time_now_ms();
auto& map = m_TransitPaths.second;
auto itr = map.begin();
@ -276,12 +276,14 @@ namespace llarp
return h;
RouterID us(OurRouterID());
auto& map = m_TransitPaths;
std::unique_lock< std::mutex > lock(map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
if(i->second->info.upstream == us)
return i->second;
util::Lock lock(map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
if(i->second->info.upstream == us)
return i->second;
}
}
return nullptr;
}

View File

@ -219,6 +219,13 @@ llarp_pathbuilder_context::BuildOne()
llarp_pathbuilder_build_path(job);
}
void
llarp_pathbuilder_context::ManualRebuild(size_t num)
{
while(num--)
BuildOne();
}
struct llarp_pathbuilder_context*
llarp_pathbuilder_context_new(struct llarp_router* router,
struct llarp_dht_context* dht, size_t sz,

View File

@ -53,6 +53,9 @@ namespace llarp
case 'T':
self->msg = new PathTransferMessage();
break;
case 'H':
self->msg = new service::ProtocolFrame();
break;
default:
llarp::LogError("invalid routing message id: ", *strbuf.cur);
}

View File

@ -22,18 +22,13 @@ namespace llarp
return false;
if(!BEncodeMaybeReadDictInt("S", S, read, key, val))
return false;
if(llarp_buffer_eq(key, "T"))
{
if(T)
delete T;
T = new service::ProtocolFrame();
return T->BDecode(val);
}
if(!BEncodeMaybeReadDictEntry("T", T, read, key, val))
return false;
if(!BEncodeMaybeReadDictInt("V", V, read, key, val))
return false;
if(!BEncodeMaybeReadDictEntry("Y", Y, read, key, val))
return false;
return false;
return read;
}
bool
@ -49,9 +44,7 @@ namespace llarp
if(!BEncodeWriteDictInt("S", S, buf))
return false;
if(!bencode_write_bytestring(buf, "T", 1))
return false;
if(!T->BEncode(buf))
if(!BEncodeWriteDictEntry("T", T, buf))
return false;
if(!BEncodeWriteDictInt("V", LLARP_PROTO_VERSION, buf))
@ -66,31 +59,7 @@ namespace llarp
PathTransferMessage::HandleMessage(IMessageHandler* h,
llarp_router* r) const
{
auto path = r->paths.GetByUpstream(r->pubkey(), P);
if(!path)
{
llarp::LogWarn("No such path for path transfer pathid=", P);
return false;
}
if(!T)
{
llarp::LogError("no data to transfer on data message");
return false;
}
byte_t tmp[service::MAX_PROTOCOL_MESSAGE_SIZE];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(!T->BEncode(&buf))
{
llarp::LogWarn("failed to transfer data message, encode failed");
return false;
}
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
llarp::LogInfo("Transfer ", buf.sz, " bytes", " to ", P);
return path->HandleDownstream(buf, Y, r);
return h->HandlePathTransferMessage(this, r);
}
} // namespace routing

View File

@ -737,6 +737,7 @@ namespace llarp
selectedIntro = intro;
}
}
ManualRebuild(numHops);
}
void
@ -797,6 +798,10 @@ namespace llarp
AsyncIntroGen* self = static_cast< AsyncIntroGen* >(user);
// randomize Nounce
self->frame.N.Randomize();
// ephemeral public key
SecretKey ephem;
self->crypto->encryption_keygen(ephem);
self->frame.H = llarp::seckey_topublic(ephem);
// randomize tag
self->msg.tag.Randomize();
// set sender
@ -805,8 +810,8 @@ namespace llarp
self->msg.introReply = self->intro;
// derive session key
self->crypto->dh_server(self->sharedKey,
self->remote.EncryptionPublicKey(),
self->m_LocalIdentity->enckey, self->frame.N);
self->remote.EncryptionPublicKey(), ephem,
self->frame.N);
// encrypt and sign
self->frame.EncryptAndSign(self->crypto, &self->msg, self->sharedKey,
@ -847,10 +852,7 @@ namespace llarp
auto path = GetPathByRouter(selectedIntro.router);
if(path)
{
routing::PathTransferMessage transfer;
transfer.T = &msg;
transfer.Y.Randomize();
transfer.P = selectedIntro.pathID;
routing::PathTransferMessage transfer(msg, selectedIntro.pathID);
llarp::LogInfo("sending frame via ", path->Upstream(), " to ",
path->Endpoint(), " for ", Name());
path->SendRoutingMessage(&transfer, m_Parent->Router());
@ -908,9 +910,7 @@ namespace llarp
Endpoint::OutboundContext::SelectHop(llarp_nodedb* db, llarp_rc* prev,
llarp_rc* cur, size_t hop)
{
// TODO: don't hard code
llarp::LogInfo("Select hop ", hop);
if(hop == 3)
if(hop == numHops - 1)
{
auto localcopy = llarp_nodedb_get_rc(db, selectedIntro.router);
if(localcopy)
@ -925,6 +925,7 @@ namespace llarp
"cannot build aligned path, don't have router for "
"introduction ",
selectedIntro);
m_Parent->EnsureRouterIsKnown(selectedIntro.router);
return false;
}
}
@ -956,20 +957,20 @@ namespace llarp
}
auto crypto = m_Parent->Crypto();
SharedSecret shared;
ProtocolFrame f;
routing::PathTransferMessage msg;
ProtocolFrame& f = msg.T;
f.N.Randomize();
f.T = *tags.begin();
f.S = m_Parent->GetSeqNoForConvo(f.T);
if(m_Parent->m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
{
ProtocolMessage msg;
msg.introReply = selectedIntro;
msg.sender = m_Parent->m_Identity.pub;
msg.PutBuffer(payload);
ProtocolMessage m;
m.introReply = selectedIntro;
m.sender = m_Parent->m_Identity.pub;
m.PutBuffer(payload);
if(!f.EncryptAndSign(crypto, &msg, shared,
if(!f.EncryptAndSign(crypto, &m, shared,
m_Parent->m_Identity.signkey))
{
llarp::LogError("failed to sign");
@ -982,10 +983,8 @@ namespace llarp
return;
}
routing::PathTransferMessage msg;
msg.P = selectedIntro.pathID;
msg.Y.Randomize();
msg.T = &f;
if(!path->SendRoutingMessage(&msg, m_Parent->Router()))
{
llarp::LogWarn("Failed to send routing message for data");

View File

@ -1,6 +1,7 @@
#include <llarp/routing/handler.hpp>
#include <llarp/service/protocol.hpp>
#include "buffer.hpp"
#include "mem.hpp"
namespace llarp
{
@ -94,6 +95,8 @@ namespace llarp
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "A", "H"))
return false;
if(!BEncodeWriteDictEntry("D", D, buf))
return false;
if(S == 0)
@ -121,6 +124,15 @@ namespace llarp
ProtocolFrame::DecodeKey(llarp_buffer_t key, llarp_buffer_t* val)
{
bool read = false;
if(llarp_buffer_eq(key, "A"))
{
llarp_buffer_t strbuf;
if(!bencode_read_string(val, &strbuf))
return false;
if(strbuf.sz != 1)
return false;
return *strbuf.cur == 'H';
}
if(!BEncodeMaybeReadDictEntry("D", D, read, key, val))
return false;
if(!BEncodeMaybeReadDictEntry("H", H, read, key, val))
@ -207,7 +219,9 @@ namespace llarp
SharedSecret shared;
if(!crypto->dh_client(shared, self->H, self->localSecret, self->N))
{
llarp::LogError("Failed to derive shared secret for initial message");
llarp::LogError(
"Failed to derive shared secret for initial message H=", self->H,
" N=", self->N);
delete self->msg;
delete self;
return;
@ -217,6 +231,7 @@ namespace llarp
if(!self->msg->BDecode(buf))
{
llarp::LogError("failed to decode inner protocol message");
llarp::DumpBuffer(*buf);
delete self->msg;
delete self;
return;

View File

@ -52,10 +52,18 @@ namespace llarp
llarp_thread_job *job;
{
lock_t lock(this->queue_mutex);
this->condition.wait(
this->condition.WaitUntil(
lock, [this] { return this->stop || !this->jobs.empty(); });
if(this->stop && this->jobs.empty())
if(this->stop)
{
// discard pending jobs
while(this->jobs.size())
{
delete this->jobs.top().job;
this->jobs.pop();
}
return;
}
job = this->jobs.top().job;
this->jobs.pop();
}
@ -75,7 +83,7 @@ namespace llarp
lock_t lock(queue_mutex);
stop = true;
}
condition.notify_all();
condition.NotifyAll();
}
void
@ -84,7 +92,7 @@ namespace llarp
for(auto &t : threads)
t.join();
threads.clear();
done.notify_all();
done.NotifyAll();
}
void
@ -99,7 +107,7 @@ namespace llarp
jobs.emplace(ids++, new llarp_thread_job(job.user, job.work));
}
condition.notify_one();
condition.NotifyOne();
}
static int
@ -186,7 +194,7 @@ struct llarp_threadpool
{
llarp::thread::Pool *impl;
std::mutex m_access;
llarp::util::Mutex m_access;
std::queue< llarp_thread_job * > jobs;
llarp_threadpool(int workers, const char *name, bool isolate,
@ -250,12 +258,12 @@ llarp_threadpool_stop(struct llarp_threadpool *pool)
void
llarp_threadpool_wait(struct llarp_threadpool *pool)
{
std::mutex mtx;
llarp::util::Mutex mtx;
llarp::LogDebug("threadpool wait");
if(pool->impl)
{
std::unique_lock< std::mutex > lock(mtx);
pool->impl->done.wait(lock);
llarp::util::Lock lock(mtx);
pool->impl->done.Wait(lock);
}
}
@ -271,7 +279,7 @@ llarp_threadpool_queue_job(struct llarp_threadpool *pool,
j->work = job.work;
j->user = job.user;
{
std::unique_lock< std::mutex > lock(pool->m_access);
llarp::util::Lock lock(pool->m_access);
pool->jobs.push(j);
}
}
@ -284,7 +292,7 @@ llarp_threadpool_tick(struct llarp_threadpool *pool)
{
llarp_thread_job *job;
{
std::unique_lock< std::mutex > lock(pool->m_access);
llarp::util::Lock lock(pool->m_access);
job = pool->jobs.front();
pool->jobs.pop();
}

View File

@ -14,8 +14,8 @@ namespace llarp
{
namespace thread
{
typedef std::mutex mtx_t;
typedef std::unique_lock< mtx_t > lock_t;
typedef util::Mutex mtx_t;
typedef util::Lock lock_t;
struct Pool
{
virtual void
@ -49,8 +49,8 @@ namespace llarp
std::priority_queue< Job_t > jobs;
uint32_t ids = 0;
mtx_t queue_mutex;
std::condition_variable condition;
std::condition_variable done;
util::Condition condition;
util::Condition done;
bool stop;
};

View File

@ -55,11 +55,11 @@ namespace llarp
struct llarp_timer_context
{
std::mutex timersMutex;
llarp::util::Mutex timersMutex;
std::unordered_map< uint32_t, llarp::timer* > timers;
std::priority_queue< llarp::timer* > calling;
std::mutex tickerMutex;
std::condition_variable* ticker = nullptr;
llarp::util::Mutex tickerMutex;
llarp::util::Condition* ticker = nullptr;
std::chrono::milliseconds nextTickLen = std::chrono::milliseconds(100);
uint32_t ids = 0;
@ -86,7 +86,7 @@ struct llarp_timer_context
void
cancel(uint32_t id)
{
std::unique_lock< std::mutex > lock(timersMutex);
llarp::util::Lock lock(timersMutex);
auto itr = timers.find(id);
if(itr == timers.end())
return;
@ -96,7 +96,7 @@ struct llarp_timer_context
void
remove(uint32_t id)
{
std::unique_lock< std::mutex > lock(timersMutex);
llarp::util::Lock lock(timersMutex);
auto itr = timers.find(id);
if(itr == timers.end())
return;
@ -107,7 +107,7 @@ struct llarp_timer_context
uint32_t
call_later(void* user, llarp_timer_handler_func func, uint64_t timeout_ms)
{
std::unique_lock< std::mutex > lock(timersMutex);
llarp::util::Lock lock(timersMutex);
uint32_t id = ++ids;
timers[id] = new llarp::timer(timeout_ms, user, func);
return id;
@ -119,7 +119,7 @@ struct llarp_timer_context
std::list< uint32_t > ids;
{
std::unique_lock< std::mutex > lock(timersMutex);
llarp::util::Lock lock(timersMutex);
for(auto& item : timers)
{
@ -169,7 +169,7 @@ llarp_timer_stop(struct llarp_timer_context* t)
t->timers.clear();
t->stop();
if(t->ticker)
t->ticker->notify_all();
t->ticker->NotifyAll();
}
void
@ -221,19 +221,19 @@ llarp_timer_tick_all_async(struct llarp_timer_context* t,
void
llarp_timer_run(struct llarp_timer_context* t, struct llarp_threadpool* pool)
{
t->ticker = new std::condition_variable;
t->ticker = new llarp::util::Condition();
while(t->run())
{
// wait for timer mutex
if(t->ticker)
{
std::unique_lock< std::mutex > lock(t->tickerMutex);
t->ticker->wait_for(lock, t->nextTickLen);
llarp::util::Lock lock(t->tickerMutex);
t->ticker->WaitFor(lock, t->nextTickLen);
}
if(t->run())
{
std::unique_lock< std::mutex > lock(t->timersMutex);
llarp::util::Lock lock(t->timersMutex);
// we woke up
llarp_timer_tick_all_async(t, pool);
}

View File

@ -133,7 +133,26 @@ namespace llarp
TransitHop::HandlePathTransferMessage(
const llarp::routing::PathTransferMessage* msg, llarp_router* r)
{
return false;
auto path = r->paths.GetByUpstream(r->pubkey(), msg->P);
if(!path)
{
llarp::LogWarn("No such path for path transfer pathid=", msg->P);
return false;
}
byte_t tmp[service::MAX_PROTOCOL_MESSAGE_SIZE];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
if(!msg->T.BEncode(&buf))
{
llarp::LogWarn("failed to transfer data message, encode failed");
return false;
}
// rewind0
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
llarp::LogInfo("Transfer ", buf.sz, " bytes", " to ", msg->P);
return path->HandleDownstream(buf, msg->Y, r);
}
} // namespace path