Merge pull request #601 from majestrate/master

v0.4.2
This commit is contained in:
Jeff 2019-05-11 10:50:01 -04:00 committed by GitHub
commit 726b5e381b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 408 additions and 96 deletions

View File

@ -171,7 +171,7 @@ testnet-build: testnet-configure
testnet:
cp $(EXE) $(TESTNET_EXE)
mkdir -p $(TESTNET_ROOT)
$(PYTHON) $(REPO)/contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --connect=4 --ifname=$(TESTNET_IFNAME) --baseport=$(TESTNET_BASEPORT) --ip=$(TESTNET_IP) --netid=$(TESTNET_NETID)
$(PYTHON) $(REPO)/contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --ifname=$(TESTNET_IFNAME) --baseport=$(TESTNET_BASEPORT) --ip=$(TESTNET_IP) --netid=$(TESTNET_NETID)
LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
$(TEST_EXE): debug

View File

@ -45,7 +45,8 @@ def main():
config['router'] = {
'net-threads': '1',
'worker-threads': '4',
'nickname': svcNodeName(nodeid)
'nickname': svcNodeName(nodeid),
'min-connections': "{}".format(args.connect)
}
if args.netid:
config['router']['netid'] = args.netid
@ -72,11 +73,8 @@ def main():
fp = os.path.join(d, 'daemon.ini')
with open(fp, 'w') as f:
config.write(f)
if nodeid == 0:
otherID = 1
else:
otherID = nodeid - 1
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName(otherID), 'rc.signed')))
for n in range(args.connect):
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName((nodeid + 1 + n) % args.svc), 'rc.signed')))
for nodeid in range(args.clients):
@ -106,14 +104,12 @@ def main():
config['services'] = {
'testnet': hiddenservice
}
fp = os.path.join(d, 'daemon.ini')
fp = os.path.join(d, 'client.ini')
with open(fp, 'w') as f:
config.write(f)
if nodeid == 0:
otherID = 1
else:
otherID = nodeid - 1
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName(otherID), 'rc.signed')))
for n in range(args.connect):
otherID = (n + nodeid) % args.svc
f.write("[bootstrap]\nadd-node={}\n".format(os.path.join(basedir,svcNodeName(otherID), 'rc.signed')))
with open(hiddenservice, 'w') as f:
f.write('''[test-service]
tag=test
@ -133,9 +129,9 @@ stdout_logfile_maxbytes=0
process_name = svc-node-%(process_num)03d
numprocs = {}
'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), exe, args.dir, args.svc))
f.write('''[program:client-node]
f.write('''[program:Client-node]
directory = {}
command = {} daemon.ini
command = bash -c "sleep 5 && {} client.ini"
autorestart=true
redirect_stderr=true
#stdout_logfile=/dev/fd/1

View File

@ -648,7 +648,7 @@ namespace llarp
// ourKey should never be in the connected list
// requester is likely in the connected list
// 4 or connection nodes (minus a potential requestor), whatever is less
const size_t want = std::min(size_t(4), nodeCount - 1);
const size_t want = std::min(size_t(4), nodeCount);
llarp::LogDebug("We want ", want, " connected nodes in the DHT");
if(!_nodes->GetManyNearExcluding(t, found, want,
std::set< Key_t >{ourKey, requester}))

View File

@ -57,12 +57,23 @@ namespace llarp
return NumPathsExistingAt(future) < expect && !BuildCooldownHit(now);
}
void
BaseSession::BlacklistSnode(const RouterID snode)
{
m_SnodeBlacklist.insert(std::move(snode));
}
bool
BaseSession::SelectHop(llarp_nodedb* db, const std::set< RouterID >& prev,
RouterContact& cur, size_t hop,
llarp::path::PathRole roles)
{
std::set< RouterID > exclude = prev;
for(const auto& snode : m_SnodeBlacklist)
{
if(snode != m_ExitRouter)
exclude.insert(snode);
}
exclude.insert(m_ExitRouter);
if(hop == numHops - 1)
{

View File

@ -39,6 +39,9 @@ namespace llarp
return shared_from_this();
}
void
BlacklistSnode(const RouterID snode);
util::StatusObject
ExtractStatus() const;
@ -129,6 +132,8 @@ namespace llarp
uint64_t seqno);
private:
std::set< RouterID > m_SnodeBlacklist;
using UpstreamTrafficQueue_t =
std::deque< llarp::routing::TransferTrafficMessage >;
using TieredQueue_t = std::map< uint8_t, UpstreamTrafficQueue_t >;

View File

@ -495,6 +495,11 @@ namespace llarp
llarp::LogWarn("Couldn't start endpoint");
return false;
}
if(m_Exit)
{
for(const auto &snode : m_SnodeBlacklist)
m_Exit->BlacklistSnode(snode);
}
return SetupNetworking();
}

View File

@ -173,9 +173,14 @@ namespace llarp
routing::IMessageHandler *h,
const PathID_t &rxid) = 0;
/// count the number of service nodes we are connected to
virtual size_t
NumberOfConnectedRouters() const = 0;
/// count the number of clients that are connected to us
virtual size_t
NumberOfConnectedClients() const = 0;
virtual bool
GetRandomConnectedRouter(RouterContact &result) const = 0;

View File

@ -39,6 +39,7 @@ namespace llarp
struct TryConnectJob
{
llarp_time_t lastAttempt = 0;
llarp::RouterContact rc;
llarp::ILinkLayer *link;
llarp::Router *router;
@ -53,6 +54,13 @@ struct TryConnectJob
{
}
bool
TimeoutReached() const
{
const auto now = router->Now();
return now > lastAttempt && now - lastAttempt > 5000;
}
void
Failed()
{
@ -69,13 +77,12 @@ struct TryConnectJob
router->FlushOutboundFor(rc.pubkey, link);
}
void
AttemptTimedout()
bool
Timeout()
{
if(ShouldRetry())
{
Attempt();
return;
return Attempt();
}
router->routerProfiling().MarkConnectTimeout(rc.pubkey);
if(router->routerProfiling().IsBad(rc.pubkey))
@ -83,19 +90,19 @@ struct TryConnectJob
if(!router->IsBootstrapNode(rc.pubkey))
router->nodedb()->Remove(rc.pubkey);
}
// delete this
router->pendingEstablishJobs.erase(rc.pubkey);
return true;
}
void
bool
Attempt()
{
--triesLeft;
if(!link->TryEstablishTo(rc))
{
// delete this
router->pendingEstablishJobs.erase(rc.pubkey);
return true;
}
lastAttempt = router->Now();
return false;
}
bool
@ -110,7 +117,8 @@ on_try_connecting(void *u)
{
TryConnectJob *j = static_cast< TryConnectJob * >(u);
j->Attempt();
if(!j->Attempt())
j->router->pendingEstablishJobs.erase(j->rc.pubkey);
}
bool
@ -285,7 +293,7 @@ namespace llarp
if(whitelistRouters)
{
const auto sz = lokinetRouters.size();
auto itr = lokinetRouters.begin();
auto itr = lokinetRouters.begin();
if(sz == 0)
return false;
if(sz > 1)
@ -689,7 +697,8 @@ namespace llarp
auto itr = pendingEstablishJobs.find(session->GetPubKey());
if(itr != pendingEstablishJobs.end())
{
itr->second->AttemptTimedout();
if(itr->second->Timeout())
pendingEstablishJobs.erase(itr);
}
}
@ -711,12 +720,46 @@ namespace llarp
}
}
size_t
Router::NumberOfRoutersMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const
{
std::set< RouterID > connected;
ForEachPeer([&](const auto *link, bool) {
if(filter(link))
connected.insert(link->GetPubKey());
});
return connected.size();
}
size_t
Router::NumberOfConnectedRouters() const
{
size_t s = 0;
ForEachPeer([&s](const auto *, bool) { ++s; });
return s;
return NumberOfRoutersMatchingFilter([&](const ILinkSession *link) -> bool {
const RouterContact rc(link->GetRemoteRC());
return rc.IsPublicRouter() && ConnectionToRouterAllowed(rc.pubkey);
});
}
size_t
Router::NumberOfConnectedClients() const
{
return NumberOfRoutersMatchingFilter([&](const ILinkSession *link) -> bool {
const RouterContact rc(link->GetRemoteRC());
return !rc.IsPublicRouter();
});
}
size_t
Router::NumberOfConnectionsMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const
{
size_t sz = 0;
ForEachPeer([&](const auto *link, bool) {
if(filter(link))
++sz;
});
return sz;
}
bool
@ -922,7 +965,7 @@ namespace llarp
if(StrEq(key, "file"))
{
LogInfo("open log file: ", val);
FILE *logfile = ::fopen(val, "a");
FILE *const logfile = ::fopen(val, "a");
if(logfile)
{
LogContext::Instance().logStream =
@ -983,18 +1026,21 @@ namespace llarp
|| (StrEq(section, "bootstrap") && StrEq(key, "add-node")))
{
// llarp::LogDebug("connect section has ", key, "=", val);
bootstrapRCList.emplace_back();
auto &rc = bootstrapRCList.back();
RouterContact rc;
if(!rc.Read(val))
{
llarp::LogWarn("failed to decode bootstrap RC, file='", val,
"' rc=", rc);
bootstrapRCList.pop_back();
;
return;
}
if(rc.Verify(crypto(), Now()))
{
llarp::LogInfo("Added bootstrap node ", RouterID(rc.pubkey));
const auto result = bootstrapRCList.insert(std::move(rc));
if(result.second)
llarp::LogInfo("Added bootstrap node ", RouterID(rc.pubkey));
else
llarp::LogWarn("Duplicate bootstrap node ", RouterID(rc.pubkey));
}
else
{
@ -1007,7 +1053,6 @@ namespace llarp
{
llarp::LogError("malformed rc file='", val, "' rc=", rc);
}
bootstrapRCList.pop_back();
}
}
else if(StrEq(section, "router"))
@ -1155,14 +1200,12 @@ namespace llarp
}
bool
Router::IsBootstrapNode(RouterID r) const
Router::IsBootstrapNode(const RouterID r) const
{
for(const auto &rc : bootstrapRCList)
{
if(rc.pubkey == r)
return true;
}
return false;
return std::count_if(
bootstrapRCList.begin(), bootstrapRCList.end(),
[r](const RouterContact &rc) -> bool { return rc.pubkey == r; })
> 0;
}
void
@ -1185,6 +1228,11 @@ namespace llarp
LogError("Failed to update our RC");
}
// kill nodes that are not allowed by network policy
nodedb()->RemoveIf([&](const RouterContact &rc) -> bool {
return !ConnectionToRouterAllowed(rc.pubkey);
});
// only do this as service node
// client endpoints do this on their own
nodedb()->visit([&](const RouterContact &rc) -> bool {
@ -1197,14 +1245,29 @@ namespace llarp
{
// kill dead nodes if client
nodedb()->RemoveIf([&](const RouterContact &rc) -> bool {
// don't kill first hop nodes
if(strictConnectPubkeys.count(rc.pubkey))
return false;
// don't kill "non-bad" nodes
if(!routerProfiling().IsBad(rc.pubkey))
return false;
routerProfiling().ClearProfile(rc.pubkey);
// don't kill bootstrap nodes
return !IsBootstrapNode(rc.pubkey);
});
}
// expire transit paths
paths.ExpirePaths(now);
{
auto itr = pendingEstablishJobs.begin();
while(itr != pendingEstablishJobs.end())
{
if(itr->second->TimeoutReached() && itr->second->Timeout())
itr = pendingEstablishJobs.erase(itr);
else
++itr;
}
}
{
auto itr = m_PersistingSessions.begin();
@ -1227,8 +1290,14 @@ namespace llarp
}
else
{
LogInfo("commit to ", itr->first, " expired");
const RouterID r(itr->first);
LogInfo("commit to ", r, " expired");
itr = m_PersistingSessions.erase(itr);
// close all the session because the commit to this router expired
ForEachPeer([&](ILinkSession *s) {
if(s->GetPubKey() == r)
s->Close();
});
}
}
}

View File

@ -246,7 +246,7 @@ namespace llarp
std::set< RouterID > strictConnectPubkeys;
/// bootstrap RCs
std::list< RouterContact > bootstrapRCList;
std::set< RouterContact > bootstrapRCList;
bool
ExitEnabled() const
@ -501,9 +501,24 @@ namespace llarp
void
ConnectToRandomRouters(int N) override;
/// count the number of unique service nodes connected via pubkey
size_t
NumberOfConnectedRouters() const override;
/// count the number of unique clients connected by pubkey
size_t
NumberOfConnectedClients() const override;
/// count unique router id's given filter to match session
size_t
NumberOfRoutersMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const;
/// count the number of connections that match filter
size_t
NumberOfConnectionsMatchingFilter(
std::function< bool(const ILinkSession *) > filter) const;
bool
TryConnectAsync(RouterContact rc, uint16_t tries) override;

View File

@ -65,6 +65,22 @@ namespace llarp
{
m_BundleRC = IsTrueValue(v.c_str());
}
if(k == "blacklist-snode")
{
RouterID snode;
if(!snode.FromString(v))
{
LogError(Name(), " invalid snode value: ", v);
return false;
}
const auto result = m_SnodeBlacklist.insert(snode);
if(!result.second)
{
LogError(Name(), " duplicate blacklist-snode: ", snode.ToString());
return false;
}
LogInfo(Name(), " adding ", snode.ToString(), " to blacklist");
}
if(k == "on-up")
{
m_OnUp = hooks::ExecShellBackend(v);
@ -659,6 +675,17 @@ namespace llarp
self->m_IsolatedLogic);
}
bool
Endpoint::SelectHop(llarp_nodedb* db, const std::set< RouterID >& prev,
RouterContact& cur, size_t hop, path::PathRole roles)
{
std::set< RouterID > exclude = prev;
for(const auto& snode : m_SnodeBlacklist)
exclude.insert(snode);
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
}
bool
Endpoint::ShouldBundleRC() const
{
@ -701,35 +728,50 @@ namespace llarp
m_PendingServiceLookups.erase(addr);
}
void
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg,
llarp_async_verify_rc* j)
{
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(itr != m_PendingRouters.end())
{
if(j->valid)
itr->second.InformResult(msg->R);
else
itr->second.InformResult({});
m_PendingRouters.erase(itr);
}
delete j;
}
bool
Endpoint::HandleGotRouterMessage(dht::GotRouterMessage_constptr msg)
{
auto itr = m_PendingRouters.find(msg->R[0].pubkey);
if(itr == m_PendingRouters.end())
return false;
if(msg->R.size() == 1)
if(msg->R.size())
{
llarp_async_verify_rc* job = new llarp_async_verify_rc;
job->nodedb = m_Router->nodedb();
job->cryptoworker = m_Router->threadpool();
job->diskworker = m_Router->diskworker();
job->logic = m_Router->logic();
job->hook = [=](llarp_async_verify_rc* j) {
auto i = m_PendingRouters.find(msg->R[0].pubkey);
if(j->valid)
i->second.InformResult(msg->R);
else
i->second.InformResult({});
m_PendingRouters.erase(i);
delete j;
};
job->rc = msg->R[0];
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg,
std::placeholders::_1);
job->rc = msg->R[0];
llarp_nodedb_async_verify(job);
}
else
{
itr->second.InformResult({});
m_PendingRouters.erase(itr);
auto itr = m_PendingRouters.begin();
while(itr != m_PendingRouters.end())
{
if(itr->second.txid == msg->txid)
{
itr->second.InformResult({});
itr = m_PendingRouters.erase(itr);
}
else
++itr;
}
}
return true;
}

View File

@ -1,6 +1,7 @@
#ifndef LLARP_SERVICE_ENDPOINT_HPP
#define LLARP_SERVICE_ENDPOINT_HPP
#include <dht/messages/gotrouter.hpp>
#include <ev/ev.h>
#include <exit/session.hpp>
#include <net/net.hpp>
@ -21,6 +22,8 @@
#define MIN_SHIFT_INTERVAL (5 * 1000)
#endif
struct llarp_async_verify_rc;
namespace llarp
{
namespace service
@ -291,6 +294,10 @@ namespace llarp
uint64_t
GetSeqNoForConvo(const ConvoTag& tag);
virtual bool
SelectHop(llarp_nodedb* db, const std::set< RouterID >& prev,
RouterContact& cur, size_t hop, path::PathRole roles) override;
virtual void
IntroSetPublishFail();
virtual void
@ -322,6 +329,10 @@ namespace llarp
RunIsolatedMainLoop(void*);
private:
void
HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg,
llarp_async_verify_rc* j);
bool
OnLookup(const service::Address& addr, const IntroSet* i,
const RouterID& endpoint); /* */
@ -346,6 +357,9 @@ namespace llarp
return false;
}
public:
std::set< RouterID > m_SnodeBlacklist;
protected:
IDataHandler* m_DataHandler = nullptr;
Identity m_Identity;
@ -374,11 +388,11 @@ namespace llarp
{
if(now < started)
return false;
return now - started > 5000;
return now - started > 30000;
}
void
InformResult(const std::vector< RouterContact >& result)
InformResult(std::vector< RouterContact > result)
{
if(handler)
handler(result);

View File

@ -305,6 +305,8 @@ namespace llarp
}
std::set< RouterID > exclude = prev;
exclude.insert(m_NextIntro.router);
for(const auto& snode : m_Endpoint->m_SnodeBlacklist)
exclude.insert(snode);
if(hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
@ -394,6 +396,8 @@ namespace llarp
{
if(intro.ExpiresSoon(now))
continue;
if(m_Endpoint->m_SnodeBlacklist.count(intro.router))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end()
&& remoteIntro.router == intro.router)
{
@ -407,14 +411,16 @@ namespace llarp
/// pick newer intro not on same router
for(const auto& intro : currentIntroSet.I)
{
if(m_Endpoint->m_SnodeBlacklist.count(intro.router))
continue;
m_Endpoint->EnsureRouterIsKnown(intro.router);
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end() && m_NextIntro != intro)
{
shifted = intro.router != m_NextIntro.router;
if(intro.expiresAt > m_NextIntro.expiresAt)
{
shifted = intro.router != m_NextIntro.router;
m_NextIntro = intro;
success = true;
}

116
llarp/util/alloc.hpp Normal file
View File

@ -0,0 +1,116 @@
#ifndef LLARP_UTIL_ALLOC_HPP
#define LLARP_UTIL_ALLOC_HPP
#include <bitset>
#include <array>
namespace llarp
{
namespace util
{
/// simple single threaded allocatable super type template
template < typename Value_t, std::size_t maxEntries >
struct AllocPool
{
using Ptr_t = Value_t *;
AllocPool()
{
mem = nullptr;
}
~AllocPool()
{
// delete mem;
}
Ptr_t
NewPtr()
{
/*
Ptr_t ptr = mem->allocate();
::new(ptr) Value_t;
return ptr;
*/
return new Value_t();
}
void
DelPtr(Ptr_t p)
{
/*
p->~Value_t();
mem->deallocate(p);
*/
delete p;
}
bool
Full() const
{
/*
return mem->full();
*/
return false;
}
bool
HasRoomFor(size_t numItems)
{
return true;
/* return mem->hasRoomFor(numItems); */
}
private:
struct Memory
{
uint8_t _buffer[maxEntries * sizeof(Value_t)];
std::bitset< maxEntries > _allocated = {0};
std::size_t _pos = 0;
bool
full() const
{
return _allocated.size() == _allocated.count();
}
bool
hasRoomFor(size_t num)
{
return _allocated.count() + num <= _allocated.size();
}
void
deallocate(void *ptr)
{
if(ptr == nullptr)
throw std::bad_alloc();
uint8_t *v_ptr = (uint8_t *)ptr;
const std::size_t _idx = (v_ptr - _buffer) / sizeof(Value_t);
_allocated.reset(_idx);
}
[[nodiscard]] Ptr_t
allocate()
{
const std::size_t _started = _pos;
while(_allocated.test(_pos))
{
_pos = (_pos + 1) % maxEntries;
if(_pos == _started)
{
// we are full
throw std::bad_alloc();
}
}
_allocated.set(_pos);
return (Ptr_t)&_buffer[_pos * sizeof(Value_t)];
}
};
Memory *mem;
};
} // namespace util
} // namespace llarp
#endif

View File

@ -104,6 +104,8 @@ namespace llarp
std::stringstream ss;
log.logStream->PreLog(ss, lvl, fname, lineno);
if(log.nodeName.size())
LogAppend(ss, "[", log.nodeName, "] ");
LogAppend(ss, std::forward< TArgs >(args)...);
log.logStream->PostLog(ss);
log.logStream->Print(lvl, fname, ss.str());

View File

@ -46,7 +46,7 @@ namespace llarp
void
OStreamLogStream::Print(LogLevel, const char*, const std::string& msg)
{
m_Out << msg;
m_Out << msg << std::flush;
}
} // namespace llarp

View File

@ -7,13 +7,13 @@ namespace llarp
namespace utp
{
bool
InboundMessage::IsExpired(llarp_time_t now) const
_InboundMessage::IsExpired(llarp_time_t now) const
{
return now > lastActive && now - lastActive >= 2000;
}
bool
InboundMessage::AppendData(const byte_t* ptr, uint16_t sz)
_InboundMessage::AppendData(const byte_t* ptr, uint16_t sz)
{
if(buffer.size_left() < sz)
return false;
@ -21,6 +21,7 @@ namespace llarp
buffer.cur += sz;
return true;
}
} // namespace utp
} // namespace llarp

View File

@ -9,6 +9,8 @@
#include <string.h>
#include <util/alloc.hpp>
namespace llarp
{
namespace utp
@ -46,7 +48,7 @@ namespace llarp
using MessageBuffer = AlignedBuffer< MAX_LINK_MSG_SIZE >;
/// pending inbound message being received
struct InboundMessage
struct _InboundMessage
{
/// timestamp of last activity
llarp_time_t lastActive;
@ -56,17 +58,6 @@ namespace llarp
/// for accessing message buffer
llarp_buffer_t buffer;
InboundMessage() : lastActive(0), _msg(), buffer(_msg)
{
}
InboundMessage(const InboundMessage& other)
: lastActive(other.lastActive), _msg(other._msg), buffer(_msg)
{
buffer.cur = buffer.base + (other.buffer.cur - other.buffer.base);
buffer.sz = other.buffer.sz;
}
/// return true if this inbound message can be removed due to expiration
bool
IsExpired(llarp_time_t now) const;
@ -77,13 +68,20 @@ namespace llarp
/// return true on success
bool
AppendData(const byte_t* ptr, uint16_t sz);
_InboundMessage() : lastActive(0), _msg(), buffer(_msg)
{
}
};
inline bool
operator==(const InboundMessage& lhs, const InboundMessage& rhs)
operator==(const _InboundMessage& lhs, const _InboundMessage& rhs)
{
return lhs.buffer.base == rhs.buffer.base;
}
using InboundMessage = std::shared_ptr< _InboundMessage >;
} // namespace utp
} // namespace llarp

View File

@ -9,6 +9,12 @@ namespace llarp
{
namespace utp
{
using SendBufferPool = util::AllocPool< FragmentBuffer, 1024 * 4 >;
using RecvBufferPool = util::AllocPool< _InboundMessage, 1024 >;
static SendBufferPool OBPool;
static RecvBufferPool IBPool;
using namespace std::placeholders;
void
@ -35,8 +41,11 @@ namespace llarp
std::vector< utp_iovec > send;
for(const auto& vec : vecq)
{
expect += vec.iov_len;
send.emplace_back(vec);
if(vec.iov_len)
{
expect += vec.iov_len;
send.emplace_back(vec);
}
}
if(expect)
{
@ -72,8 +81,10 @@ namespace llarp
auto itr = m_RecvMsgs.begin();
while(itr != m_RecvMsgs.end())
{
if(itr->second.IsExpired(now))
if(itr->second->IsExpired(now))
{
itr = m_RecvMsgs.erase(itr);
}
else
++itr;
}
@ -264,7 +275,12 @@ namespace llarp
// this means we're stalled
return false;
}
size_t sz = buf.sz;
size_t sz = buf.sz;
if(!OBPool.HasRoomFor(sz / FragmentBodyPayloadSize))
{
LogError("Send buffers are full");
return false;
}
byte_t* ptr = buf.base;
uint32_t msgid = m_NextTXMsgID++;
while(sz)
@ -364,14 +380,15 @@ namespace llarp
uint16_t remaining)
{
sendq.emplace_back();
sendq.emplace_back(OBPool.NewPtr(),
[](FragmentBuffer* ptr) { OBPool.DelPtr(ptr); });
auto& buf = sendq.back();
vecq.emplace_back();
auto& vec = vecq.back();
vec.iov_base = buf.data();
vec.iov_base = buf->data();
vec.iov_len = FragmentBufferSize;
buf.Randomize();
byte_t* noncePtr = buf.data() + FragmentHashSize;
buf->Randomize();
byte_t* noncePtr = buf->data() + FragmentHashSize;
byte_t* body = noncePtr + FragmentNonceSize;
byte_t* base = body;
AlignedBuffer< 24 > A(base);
@ -402,7 +419,7 @@ namespace llarp
payload.cur = payload.base;
payload.sz = FragmentBufferSize - FragmentHashSize;
// key'd hash
if(!OurCrypto()->hmac(buf.data(), payload, txKey))
if(!OurCrypto()->hmac(buf->data(), payload, txKey))
return false;
return MutateKey(txKey, A);
}
@ -539,14 +556,22 @@ namespace llarp
// get message
if(m_RecvMsgs.find(msgid) == m_RecvMsgs.end())
{
m_RecvMsgs.emplace(msgid, InboundMessage{});
if(IBPool.Full())
{
LogError("inbound buffer mempool full");
return false;
}
m_RecvMsgs.emplace(
msgid, InboundMessage(IBPool.NewPtr(), [](_InboundMessage* m) {
IBPool.DelPtr(m);
}));
}
auto itr = m_RecvMsgs.find(msgid);
// add message activity
itr->second.lastActive = parent->Now();
itr->second->lastActive = parent->Now();
// append data
if(!itr->second.AppendData(out.cur, length))
if(!itr->second->AppendData(out.cur, length))
{
LogError("inbound buffer is full");
return false; // not enough room
@ -561,8 +586,8 @@ namespace llarp
if(remaining == 0)
{
// we done with this guy, prune next tick
itr->second.lastActive = 0;
ManagedBuffer buf(itr->second.buffer);
itr->second->lastActive = 0;
ManagedBuffer buf(itr->second->buffer);
// resize
buf.underlying.sz = buf.underlying.cur - buf.underlying.base;
// rewind

View File

@ -14,6 +14,8 @@ namespace llarp
{
struct LinkLayer;
using SendFragmentBuffer = std::shared_ptr< FragmentBuffer >;
struct Session : public ILinkSession
{
/// remote router's rc
@ -42,7 +44,7 @@ namespace llarp
/// send queue for utp
std::deque< utp_iovec > vecq;
/// tx fragment queue
std::deque< FragmentBuffer > sendq;
std::deque< SendFragmentBuffer > sendq;
/// current rx fragment buffer
FragmentBuffer recvBuf;
/// current offset in current rx fragment buffer