1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00
lokinet/llarp/service/endpoint.cpp

1363 lines
38 KiB
C++
Raw Normal View History

#include <chrono>
#include <service/endpoint.hpp>
#include <dht/context.hpp>
2018-12-12 01:48:54 +01:00
#include <dht/messages/findintro.hpp>
2019-01-16 01:24:16 +01:00
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotintro.hpp>
#include <dht/messages/gotrouter.hpp>
#include <dht/messages/pubintro.hpp>
#include <nodedb.hpp>
#include <profiling.hpp>
#include <router/abstractrouter.hpp>
#include <routing/dht_message.hpp>
2019-06-20 00:30:07 +02:00
#include <routing/path_transfer_message.hpp>
#include <service/endpoint_state.hpp>
#include <service/endpoint_util.hpp>
#include <service/hidden_service_address_lookup.hpp>
#include <service/outbound_context.hpp>
2018-12-12 03:15:08 +01:00
#include <service/protocol.hpp>
2019-09-01 15:26:16 +02:00
#include <util/thread/logic.hpp>
#include <util/str.hpp>
#include <util/buffer.hpp>
2019-09-01 14:38:03 +02:00
#include <util/meta/memfn.hpp>
#include <hook/shell.hpp>
#include <link/link_manager.hpp>
2019-09-01 15:26:16 +02:00
2019-07-31 01:42:13 +02:00
#include <utility>
namespace llarp
{
namespace service
{
Endpoint::Endpoint(const std::string& name, AbstractRouter* r,
Context* parent)
2019-11-29 00:08:02 +01:00
: path::Builder(r, 3, path::default_len)
, context(parent)
, m_RecvQueue(128)
{
m_state = std::make_unique< EndpointState >();
m_state->m_Router = r;
m_state->m_Name = name;
m_state->m_Tag.Zero();
2019-11-29 00:08:02 +01:00
m_RecvQueue.enable();
}
bool
Endpoint::SetOption(const std::string& k, const std::string& v)
{
2019-07-18 18:28:17 +02:00
return m_state->SetOption(k, v, *this);
2018-08-09 21:02:17 +02:00
}
2019-04-08 14:01:52 +02:00
llarp_ev_loop_ptr
Endpoint::EndpointNetLoop()
{
if(m_state->m_IsolatedNetLoop)
return m_state->m_IsolatedNetLoop;
2019-07-06 19:03:40 +02:00
return Router()->netloop();
}
2018-08-16 16:34:15 +02:00
bool
Endpoint::NetworkIsIsolated() const
{
return m_state->m_IsolatedLogic.get() != nullptr
&& m_state->m_IsolatedNetLoop != nullptr;
2018-08-09 21:02:17 +02:00
}
2018-08-10 05:51:38 +02:00
bool
Endpoint::HasPendingPathToService(const Address& addr) const
{
return m_state->m_PendingServiceLookups.find(addr)
!= m_state->m_PendingServiceLookups.end();
2018-08-10 05:51:38 +02:00
}
void
2019-11-05 17:58:53 +01:00
Endpoint::RegenAndPublishIntroSet(bool forceRebuild)
{
2019-11-05 17:58:53 +01:00
const auto now = llarp::time_now_ms();
std::set< Introduction > introset;
if(!GetCurrentIntroductionsWithFilter(
introset, [now](const service::Introduction& intro) -> bool {
2020-02-25 21:52:59 +01:00
return not intro.ExpiresSoon(now, path::min_intro_lifetime);
}))
{
2019-04-21 17:40:32 +02:00
LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get enough valid introductions");
2018-10-29 17:48:36 +01:00
if(ShouldBuildMore(now) || forceRebuild)
2018-11-22 16:52:04 +01:00
ManualRebuild(1);
return;
}
introSet().I.clear();
for(auto& intro : introset)
{
introSet().I.emplace_back(std::move(intro));
}
if(introSet().I.size() == 0)
{
2019-04-21 17:40:32 +02:00
LogWarn("not enough intros to publish introset for ", Name());
if(ShouldBuildMore(now) || forceRebuild)
ManualRebuild(1);
return;
}
introSet().topic = m_state->m_Tag;
2020-01-27 22:30:41 +01:00
auto maybe = m_Identity.EncryptAndSignIntroSet(introSet(), now);
if(not maybe.has_value())
{
2020-01-27 22:30:41 +01:00
LogWarn("failed to generate introset for endpoint ", Name());
return;
}
2020-01-27 22:30:41 +01:00
if(PublishIntroSet(maybe.value(), Router()))
{
2019-04-21 17:40:32 +02:00
LogInfo("(re)publishing introset for endpoint ", Name());
}
else
{
2019-04-21 17:40:32 +02:00
LogWarn("failed to publish intro set for endpoint ", Name());
}
}
bool
Endpoint::IsReady() const
{
const auto now = Now();
if(introSet().I.size() == 0)
return false;
if(introSet().IsExpired(now))
return false;
return true;
}
bool
Endpoint::HasPendingRouterLookup(const RouterID remote) const
{
const auto& routers = m_state->m_PendingRouters;
return routers.find(remote) != routers.end();
}
bool
Endpoint::GetEndpointWithConvoTag(const ConvoTag tag,
llarp::AlignedBuffer< 32 >& addr,
bool& snode) const
{
auto itr = Sessions().find(tag);
if(itr != Sessions().end())
{
snode = false;
addr = itr->second.remote.Addr();
return true;
}
2019-07-31 01:42:13 +02:00
for(const auto& item : m_state->m_SNodeSessions)
{
2019-07-31 01:42:13 +02:00
if(item.second.second == tag)
{
2019-07-31 01:42:13 +02:00
snode = true;
addr = item.first;
return true;
}
}
2019-07-31 01:42:13 +02:00
return false;
}
bool
Endpoint::IntrosetIsStale() const
{
return introSet().HasExpiredIntros(Now());
}
2019-02-11 18:14:43 +01:00
util::StatusObject
Endpoint::ExtractStatus() const
2019-02-08 20:43:25 +01:00
{
auto obj = path::Builder::ExtractStatus();
obj["identity"] = m_Identity.pub.Addr().ToString();
return m_state->ExtractStatus(obj);
2019-02-08 20:43:25 +01:00
}
2019-11-05 18:01:34 +01:00
void Endpoint::Tick(llarp_time_t)
{
2019-11-05 17:58:53 +01:00
const auto now = llarp::time_now_ms();
2019-04-23 18:13:22 +02:00
path::Builder::Tick(now);
2018-07-19 06:58:39 +02:00
// publish descriptors
2018-07-19 00:50:05 +02:00
if(ShouldPublishDescriptors(now))
{
2019-11-05 17:58:53 +01:00
RegenAndPublishIntroSet();
}
2019-02-21 20:26:59 +01:00
m_state->m_RemoteLookupFilter.Decay(now);
2018-12-13 13:27:14 +01:00
// expire snode sessions
EndpointUtil::ExpireSNodeSessions(now, m_state->m_SNodeSessions);
// expire pending tx
EndpointUtil::ExpirePendingTx(now, m_state->m_PendingLookups);
2018-08-14 23:17:18 +02:00
// expire pending router lookups
EndpointUtil::ExpirePendingRouterLookups(now, m_state->m_PendingRouters);
2018-08-14 23:17:18 +02:00
// prefetch addrs
for(const auto& addr : m_state->m_PrefetchAddrs)
{
if(!EndpointUtil::HasPathToService(addr, m_state->m_RemoteSessions))
{
2018-08-22 17:52:10 +02:00
if(!EnsurePathToService(
2020-02-24 20:40:45 +01:00
addr, [](Address, OutboundContext*) {}, 10s))
{
2019-04-21 17:40:32 +02:00
LogWarn("failed to ensure path to ", addr);
}
}
}
2019-02-05 15:50:33 +01:00
// deregister dead sessions
EndpointUtil::DeregisterDeadSessions(now, m_state->m_DeadSessions);
// tick remote sessions
EndpointUtil::TickRemoteSessions(now, m_state->m_RemoteSessions,
m_state->m_DeadSessions, Sessions());
2019-02-09 15:37:24 +01:00
// expire convotags
EndpointUtil::ExpireConvoSessions(now, Sessions());
2018-09-24 17:52:25 +02:00
}
bool
Endpoint::Stop()
{
// stop remote sessions
EndpointUtil::StopRemoteSessions(m_state->m_RemoteSessions);
// stop snode sessions
EndpointUtil::StopSnodeSessions(m_state->m_SNodeSessions);
if(m_OnDown)
m_OnDown->NotifyAsync(NotifyParams());
2019-04-21 17:40:32 +02:00
return path::Builder::Stop();
}
2018-07-18 05:10:21 +02:00
uint64_t
Endpoint::GenTXID()
{
uint64_t txid = randint();
const auto& lookups = m_state->m_PendingLookups;
while(lookups.find(txid) != lookups.end())
2018-07-18 05:10:21 +02:00
++txid;
return txid;
}
2018-07-16 05:32:13 +02:00
std::string
Endpoint::Name() const
{
return m_state->m_Name + ":" + m_Identity.pub.Name();
2018-07-16 05:32:13 +02:00
}
2018-08-04 04:59:32 +02:00
void
Endpoint::PutLookup(IServiceLookup* lookup, uint64_t txid)
{
m_state->m_PendingLookups.emplace(
txid, std::unique_ptr< IServiceLookup >(lookup));
2018-08-04 04:59:32 +02:00
}
bool
2019-05-03 15:15:03 +02:00
Endpoint::HandleGotIntroMessage(dht::GotIntroMessage_constptr msg)
{
2020-01-27 22:30:41 +01:00
std::set< EncryptedIntroSet > remote;
for(const auto& introset : msg->found)
{
2020-01-27 22:30:41 +01:00
if(not introset.Verify(Now()))
{
2020-01-27 22:30:41 +01:00
LogError(Name(), " got invalid introset");
return false;
2018-07-19 00:50:05 +02:00
}
2019-07-06 19:03:40 +02:00
remote.insert(introset);
}
auto& lookups = m_state->m_PendingLookups;
auto itr = lookups.find(msg->txid);
if(itr == lookups.end())
2018-07-18 05:10:21 +02:00
{
2019-04-21 17:40:32 +02:00
LogWarn("invalid lookup response for hidden service endpoint ", Name(),
" txid=", msg->txid);
2018-07-20 06:50:28 +02:00
return true;
2018-07-18 05:10:21 +02:00
}
2018-08-14 23:17:18 +02:00
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
lookups.erase(itr);
if(not lookup->HandleResponse(remote))
lookups.emplace(msg->txid, std::move(lookup));
2018-08-14 23:17:18 +02:00
return true;
}
bool
Endpoint::HasInboundConvo(const Address& addr) const
{
for(const auto& item : Sessions())
{
if(item.second.remote.Addr() == addr && item.second.inbound)
return true;
}
return false;
}
2018-08-09 21:02:17 +02:00
void
Endpoint::PutSenderFor(const ConvoTag& tag, const ServiceInfo& info,
bool inbound)
2018-08-09 21:02:17 +02:00
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
{
itr = Sessions().emplace(tag, Session{}).first;
2019-07-01 15:44:25 +02:00
itr->second.inbound = inbound;
itr->second.remote = info;
2018-08-09 21:02:17 +02:00
}
2018-10-29 17:48:36 +01:00
itr->second.lastUsed = Now();
2018-08-09 21:02:17 +02:00
}
bool
Endpoint::GetSenderFor(const ConvoTag& tag, ServiceInfo& si) const
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
return false;
si = itr->second.remote;
return true;
}
void
Endpoint::PutIntroFor(const ConvoTag& tag, const Introduction& intro)
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
{
return;
2018-08-09 21:02:17 +02:00
}
itr->second.intro = intro;
2018-10-29 17:48:36 +01:00
itr->second.lastUsed = Now();
2018-08-09 21:02:17 +02:00
}
bool
Endpoint::GetIntroFor(const ConvoTag& tag, Introduction& intro) const
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
return false;
intro = itr->second.intro;
return true;
}
2019-02-21 17:45:33 +01:00
void
Endpoint::PutReplyIntroFor(const ConvoTag& tag, const Introduction& intro)
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2019-02-21 17:45:33 +01:00
{
return;
2019-02-21 17:45:33 +01:00
}
itr->second.replyIntro = intro;
itr->second.lastUsed = Now();
}
bool
Endpoint::GetReplyIntroFor(const ConvoTag& tag, Introduction& intro) const
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2019-02-21 17:45:33 +01:00
return false;
intro = itr->second.replyIntro;
return true;
}
2018-08-09 21:02:17 +02:00
bool
Endpoint::GetConvoTagsForService(const Address& addr,
2018-08-09 21:02:17 +02:00
std::set< ConvoTag >& tags) const
{
return EndpointUtil::GetConvoTagsForService(Sessions(), addr, tags);
2018-08-09 21:02:17 +02:00
}
bool
Endpoint::GetCachedSessionKeyFor(const ConvoTag& tag,
SharedSecret& secret) const
2018-08-09 21:02:17 +02:00
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
return false;
secret = itr->second.sharedKey;
2018-08-09 21:02:17 +02:00
return true;
}
void
Endpoint::PutCachedSessionKeyFor(const ConvoTag& tag, const SharedSecret& k)
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
{
itr = Sessions().emplace(tag, Session{}).first;
2018-08-09 21:02:17 +02:00
}
itr->second.sharedKey = k;
2018-10-29 17:48:36 +01:00
itr->second.lastUsed = Now();
2018-08-09 21:02:17 +02:00
}
2019-09-19 22:28:12 +02:00
void
Endpoint::MarkConvoTagActive(const ConvoTag& tag)
{
auto itr = Sessions().find(tag);
if(itr != Sessions().end())
{
itr->second.lastUsed = Now();
}
}
bool
Endpoint::LoadKeyFile()
{
const auto& keyfile = m_state->m_Keyfile;
if(!keyfile.empty())
{
if(!m_Identity.EnsureKeys(keyfile,
Router()->keyManager()->needBackup()))
{
LogError("Can't ensure keyfile [", keyfile, "]");
return false;
}
}
else
{
m_Identity.RegenerateKeys();
}
return true;
}
bool
Endpoint::Start()
{
m_state->m_RemoteLookupFilter.DecayInterval(500ms);
// how can I tell if a m_Identity isn't loaded?
2018-08-09 21:02:17 +02:00
if(!m_DataHandler)
{
m_DataHandler = this;
}
2018-08-16 16:34:15 +02:00
// this does network isolation
while(m_state->m_OnInit.size())
2018-08-09 21:02:17 +02:00
{
if(m_state->m_OnInit.front()())
m_state->m_OnInit.pop_front();
2018-08-09 21:02:17 +02:00
else
{
2019-04-21 17:40:32 +02:00
LogWarn("Can't call init of network isolation");
2018-08-09 21:02:17 +02:00
return false;
}
2018-08-09 21:02:17 +02:00
}
return true;
}
Endpoint::~Endpoint()
{
if(m_OnUp)
m_OnUp->Stop();
if(m_OnDown)
m_OnDown->Stop();
if(m_OnReady)
m_OnReady->Stop();
}
2018-07-18 05:10:21 +02:00
2020-02-10 18:52:24 +01:00
bool
2020-02-24 20:53:55 +01:00
Endpoint::PublishIntroSet(const EncryptedIntroSet& introset,
AbstractRouter* r)
2020-02-10 18:52:24 +01:00
{
2020-03-02 17:55:48 +01:00
const auto paths = GetManyPathsWithUniqueEndpoints(
this, llarp::dht::IntroSetRelayRedundancy);
if(paths.size() != llarp::dht::IntroSetRelayRedundancy)
{
LogWarn("Cannot publish intro set because we only have ", paths.size(),
" paths, but need ", llarp::dht::IntroSetRelayRedundancy);
return false;
}
2020-02-10 18:34:14 +01:00
// do publishing for each path selected
size_t published = 0;
2020-02-12 17:40:48 +01:00
for(const auto& path : paths)
2020-02-10 18:34:14 +01:00
{
for(size_t i = 0; i < llarp::dht::IntroSetRequestsPerRelay; ++i)
2020-02-10 18:34:14 +01:00
{
if(PublishIntroSetVia(introset, r, path, published))
published++;
2020-02-10 18:34:14 +01:00
}
}
if(published != llarp::dht::IntroSetStorageRedundancy)
LogWarn("Publish introset failed: could only publish ", published,
" copies but wanted ", llarp::dht::IntroSetStorageRedundancy);
return published == llarp::dht::IntroSetStorageRedundancy;
2018-07-18 05:10:21 +02:00
}
2018-09-18 16:48:06 +02:00
struct PublishIntroSetJob : public IServiceLookup
{
2020-01-27 22:30:41 +01:00
EncryptedIntroSet m_IntroSet;
2018-09-18 16:48:06 +02:00
Endpoint* m_Endpoint;
uint64_t m_relayOrder;
2020-01-27 22:30:41 +01:00
PublishIntroSetJob(Endpoint* parent, uint64_t id,
EncryptedIntroSet introset, uint64_t relayOrder)
2018-09-18 16:48:06 +02:00
: IServiceLookup(parent, id, "PublishIntroSet")
2019-07-31 01:42:13 +02:00
, m_IntroSet(std::move(introset))
2018-09-18 16:48:06 +02:00
, m_Endpoint(parent)
, m_relayOrder(relayOrder)
2018-09-18 16:48:06 +02:00
{
}
std::shared_ptr< routing::IMessage >
2019-07-31 01:42:13 +02:00
BuildRequestMessage() override
2018-09-18 16:48:06 +02:00
{
auto msg = std::make_shared< routing::DHTMessage >();
2020-02-20 16:36:29 +01:00
msg->M.emplace_back(std::make_unique< dht::PublishIntroMessage >(
m_IntroSet, txid, true, m_relayOrder));
2018-09-18 16:48:06 +02:00
return msg;
}
bool
2020-01-27 22:30:41 +01:00
HandleResponse(const std::set< EncryptedIntroSet >& response) override
2018-09-18 16:48:06 +02:00
{
2020-01-27 22:30:41 +01:00
if(not response.empty())
2018-09-18 16:48:06 +02:00
m_Endpoint->IntroSetPublished();
else
m_Endpoint->IntroSetPublishFail();
return true;
}
};
2018-07-18 05:10:21 +02:00
void
Endpoint::IntroSetPublishFail()
{
auto now = Now();
if(ShouldPublishDescriptors(now))
{
2019-11-05 17:58:53 +01:00
RegenAndPublishIntroSet();
}
2019-04-21 17:40:32 +02:00
else if(NumInStatus(path::ePathEstablished) < 3)
{
if(introSet().HasExpiredIntros(now))
ManualRebuild(1);
}
2018-09-18 16:48:06 +02:00
}
bool
2020-02-24 20:53:55 +01:00
Endpoint::PublishIntroSetVia(const EncryptedIntroSet& introset,
AbstractRouter* r, path::Path_ptr path,
uint64_t relayOrder)
2018-09-18 16:48:06 +02:00
{
2020-02-24 20:52:49 +01:00
auto job = new PublishIntroSetJob(this, GenTXID(), introset, relayOrder);
2018-09-18 16:48:06 +02:00
if(job->SendRequestViaPath(path, r))
{
m_state->m_LastPublishAttempt = Now();
2018-09-18 16:48:06 +02:00
return true;
}
return false;
2018-07-18 05:10:21 +02:00
}
2019-05-07 19:46:38 +02:00
void
Endpoint::ResetInternalState()
{
path::Builder::ResetInternalState();
static auto resetState = [](auto& container, auto getter) {
std::for_each(container.begin(), container.end(), [getter](auto& item) {
getter(item)->ResetInternalState();
});
2019-05-07 19:46:38 +02:00
};
resetState(m_state->m_RemoteSessions,
[](const auto& item) { return item.second; });
resetState(m_state->m_SNodeSessions,
[](const auto& item) { return item.second.first; });
2019-05-07 19:46:38 +02:00
}
2018-07-18 05:10:21 +02:00
bool
2018-07-19 00:50:05 +02:00
Endpoint::ShouldPublishDescriptors(llarp_time_t now) const
2018-07-18 05:10:21 +02:00
{
if(not m_PublishIntroSet)
return false;
auto next_pub = m_state->m_LastPublishAttempt
+ (m_state->m_IntroSet.HasExpiredIntros(now)
? INTROSET_PUBLISH_RETRY_INTERVAL
: INTROSET_PUBLISH_INTERVAL);
return now >= next_pub;
2018-07-18 05:10:21 +02:00
}
void
Endpoint::IntroSetPublished()
{
const auto now = Now();
// We usually get 4 confirmations back (one for each DHT location), which
// is noisy: suppress this log message if we already had a confirmation in
// the last second.
if(m_state->m_LastPublish < now - 1s)
LogInfo(Name(), " IntroSet publish confirmed");
else
LogDebug(Name(), " Additional IntroSet publish confirmed");
m_state->m_LastPublish = now;
if(m_OnReady)
m_OnReady->NotifyAsync(NotifyParams());
m_OnReady = nullptr;
2018-07-18 05:10:21 +02:00
}
void
Endpoint::IsolatedNetworkMainLoop()
{
m_state->m_IsolatedNetLoop = llarp_make_ev_loop();
m_state->m_IsolatedLogic = std::make_shared< llarp::Logic >();
if(SetupNetworking())
llarp_ev_loop_run_single_process(m_state->m_IsolatedNetLoop,
m_state->m_IsolatedLogic);
else
{
m_state->m_IsolatedNetLoop.reset();
m_state->m_IsolatedLogic.reset();
}
2018-08-09 21:02:17 +02:00
}
2019-05-10 18:19:33 +02:00
bool
Endpoint::SelectHop(llarp_nodedb* db, const std::set< RouterID >& prev,
RouterContact& cur, size_t hop, path::PathRole roles)
{
std::set< RouterID > exclude = prev;
for(const auto& snode : SnodeBlacklist())
2019-05-10 18:19:33 +02:00
exclude.insert(snode);
if(hop == 0)
{
const auto exits = GetExitRouters();
// exclude exit node as first hop in any paths
exclude.insert(exits.begin(), exits.end());
}
2020-01-06 16:28:37 +01:00
if(hop == numHops - 1)
{
// diversify endpoints
ForEachPath([&exclude](const path::Path_ptr& path) {
exclude.insert(path->Endpoint());
});
}
2019-05-10 18:19:33 +02:00
return path::Builder::SelectHop(db, exclude, cur, hop, roles);
}
void
Endpoint::PathBuildStarted(path::Path_ptr path)
{
path::Builder::PathBuildStarted(path);
}
std::set< RouterID >
Endpoint::GetExitRouters() const
{
return m_ExitMap.TransformValues< RouterID >(
[](const exit::BaseSession_ptr& ptr) -> RouterID {
return ptr->Endpoint();
});
}
bool
Endpoint::ShouldBundleRC() const
{
return m_state->m_BundleRC;
}
2018-07-23 01:14:29 +02:00
void
2019-04-21 17:40:32 +02:00
Endpoint::PutNewOutboundContext(const service::IntroSet& introset)
2018-07-23 01:14:29 +02:00
{
Address addr;
introset.A.CalculateAddress(addr.as_array());
2018-07-23 01:14:29 +02:00
auto& remoteSessions = m_state->m_RemoteSessions;
auto& serviceLookups = m_state->m_PendingServiceLookups;
if(remoteSessions.count(addr) >= MAX_OUTBOUND_CONTEXT_COUNT)
2018-07-23 01:14:29 +02:00
{
auto itr = remoteSessions.find(addr);
auto range = serviceLookups.equal_range(addr);
auto i = range.first;
if(i != range.second)
{
i->second(addr, itr->second.get());
++i;
}
serviceLookups.erase(addr);
return;
2018-07-23 01:14:29 +02:00
}
auto it = remoteSessions.emplace(
2019-04-23 18:13:22 +02:00
addr, std::make_shared< OutboundContext >(introset, this));
2019-04-21 17:40:32 +02:00
LogInfo("Created New outbound context for ", addr.ToString());
2018-07-23 01:14:29 +02:00
// inform pending
auto range = serviceLookups.equal_range(addr);
auto itr = range.first;
if(itr != range.second)
2018-07-23 01:14:29 +02:00
{
itr->second(addr, it->second.get());
++itr;
2018-07-23 01:14:29 +02:00
}
serviceLookups.erase(addr);
2018-07-23 01:14:29 +02:00
}
void
Endpoint::HandleVerifyGotRouter(dht::GotRouterMessage_constptr msg,
llarp_async_verify_rc* j)
{
auto& pendingRouters = m_state->m_PendingRouters;
auto itr = pendingRouters.find(j->rc.pubkey);
if(itr != pendingRouters.end())
{
if(j->valid)
itr->second.InformResult(msg->foundRCs);
else
itr->second.InformResult({});
pendingRouters.erase(itr);
}
delete j;
}
2018-08-10 23:34:11 +02:00
bool
2019-05-03 15:15:03 +02:00
Endpoint::HandleGotRouterMessage(dht::GotRouterMessage_constptr msg)
2018-08-10 23:34:11 +02:00
{
if(not msg->foundRCs.empty())
{
for(const auto& rc : msg->foundRCs)
{
llarp_async_verify_rc* job = new llarp_async_verify_rc();
job->nodedb = Router()->nodedb();
job->cryptoworker = Router()->threadpool();
job->diskworker = Router()->diskworker();
job->logic = Router()->logic();
job->hook = std::bind(&Endpoint::HandleVerifyGotRouter, this, msg,
std::placeholders::_1);
job->rc = rc;
llarp_nodedb_async_verify(job);
}
2019-05-03 15:15:03 +02:00
}
else
{
auto& routers = m_state->m_PendingRouters;
auto itr = routers.begin();
while(itr != routers.end())
{
if(itr->second.txid == msg->txid)
{
itr->second.InformResult({});
itr = routers.erase(itr);
}
else
++itr;
}
2018-08-10 23:34:11 +02:00
}
return true;
2018-08-10 23:34:11 +02:00
}
void
Endpoint::EnsureRouterIsKnown(const RouterID& router)
{
2018-08-15 00:07:58 +02:00
if(router.IsZero())
return;
if(!Router()->nodedb()->Has(router))
2018-08-10 23:34:11 +02:00
{
2019-05-03 15:15:03 +02:00
LookupRouterAnon(router, nullptr);
2018-12-19 18:48:29 +01:00
}
}
2018-08-10 23:34:11 +02:00
2018-12-19 18:48:29 +01:00
bool
2019-05-03 15:15:03 +02:00
Endpoint::LookupRouterAnon(RouterID router, RouterLookupHandler handler)
2018-12-19 18:48:29 +01:00
{
auto& routers = m_state->m_PendingRouters;
if(routers.find(router) == routers.end())
2018-12-19 18:48:29 +01:00
{
auto path = GetEstablishedPathClosestTo(router);
routing::DHTMessage msg;
auto txid = GenTXID();
msg.M.emplace_back(
std::make_unique< dht::FindRouterMessage >(txid, router));
2018-12-19 18:48:29 +01:00
if(path && path->SendRoutingMessage(msg, Router()))
2018-12-19 18:48:29 +01:00
{
routers.emplace(router, RouterLookupJob(this, handler));
2018-12-19 18:48:29 +01:00
return true;
2018-08-10 23:34:11 +02:00
}
}
2018-12-19 18:48:29 +01:00
return false;
2018-08-10 23:34:11 +02:00
}
void
Endpoint::HandlePathBuilt(path::Path_ptr p)
{
2019-06-02 23:19:10 +02:00
p->SetDataHandler(util::memFn(&Endpoint::HandleHiddenServiceFrame, this));
p->SetDropHandler(util::memFn(&Endpoint::HandleDataDrop, this));
p->SetDeadChecker(util::memFn(&Endpoint::CheckPathIsDead, this));
path::Builder::HandlePathBuilt(p);
}
bool
2019-04-23 18:13:22 +02:00
Endpoint::HandleDataDrop(path::Path_ptr p, const PathID_t& dst,
uint64_t seq)
{
2019-04-21 17:40:32 +02:00
LogWarn(Name(), " message ", seq, " dropped by endpoint ", p->Endpoint(),
" via ", dst);
return true;
}
std::unordered_map< std::string, std::string >
Endpoint::NotifyParams() const
{
return {{"LOKINET_ADDR", m_Identity.pub.Addr().ToString()}};
}
2019-11-29 00:08:02 +01:00
void
Endpoint::FlushRecvData()
{
do
{
auto maybe = m_RecvQueue.tryPopFront();
if(not maybe.has_value())
return;
auto ev = std::move(maybe.value());
ProtocolMessage::ProcessAsync(ev.fromPath, ev.pathid, ev.msg);
} while(true);
}
void
Endpoint::QueueRecvData(RecvDataEvent ev)
{
if(m_RecvQueue.full() || m_RecvQueue.empty())
{
auto self = this;
LogicCall(m_router->logic(), [self]() { self->FlushRecvData(); });
}
m_RecvQueue.pushBack(std::move(ev));
}
bool
2019-06-28 16:12:20 +02:00
Endpoint::HandleDataMessage(path::Path_ptr path, const PathID_t from,
2019-05-03 15:15:03 +02:00
std::shared_ptr< ProtocolMessage > msg)
{
2019-06-06 12:52:27 +02:00
msg->sender.UpdateAddr();
PutSenderFor(msg->tag, msg->sender, true);
2019-06-28 16:12:20 +02:00
PutReplyIntroFor(msg->tag, path->intro);
Introduction intro;
2019-07-01 15:44:25 +02:00
intro.pathID = from;
intro.router = PubKey(path->Endpoint());
2019-06-28 16:48:00 +02:00
intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt);
2019-06-28 16:12:20 +02:00
PutIntroFor(msg->tag, intro);
2018-09-18 19:48:26 +02:00
return ProcessDataMessage(msg);
}
2018-11-29 15:01:13 +01:00
bool
2019-07-01 15:44:25 +02:00
Endpoint::HasPathToSNode(const RouterID ident) const
2018-11-29 15:01:13 +01:00
{
auto range = m_state->m_SNodeSessions.equal_range(ident);
auto itr = range.first;
2018-11-29 15:01:13 +01:00
while(itr != range.second)
{
if(itr->second.first->IsReady())
2018-11-29 15:01:13 +01:00
{
return true;
}
++itr;
}
return false;
}
2018-11-29 14:12:35 +01:00
bool
2019-05-03 15:15:03 +02:00
Endpoint::ProcessDataMessage(std::shared_ptr< ProtocolMessage > msg)
2018-11-29 14:12:35 +01:00
{
2019-06-11 18:44:05 +02:00
if(msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6)
2018-11-29 14:12:35 +01:00
{
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 18:21:11 +01:00
util::Lock l(m_state->m_InboundTrafficQueueMutex);
m_state->m_InboundTrafficQueue.emplace(msg);
2019-05-22 19:47:33 +02:00
return true;
2018-11-29 14:12:35 +01:00
}
2019-07-06 19:03:40 +02:00
if(msg->proto == eProtocolControl)
2018-11-29 14:12:35 +01:00
{
// TODO: implement me (?)
// right now it's just random noise
2018-11-29 14:12:35 +01:00
return true;
}
return false;
}
2019-03-08 17:00:45 +01:00
void
Endpoint::RemoveConvoTag(const ConvoTag& t)
{
Sessions().erase(t);
2019-03-08 17:00:45 +01:00
}
bool
Endpoint::HandleHiddenServiceFrame(path::Path_ptr p,
const ProtocolFrame& frame)
{
if(frame.R)
2019-03-08 17:00:45 +01:00
{
// handle discard
ServiceInfo si;
if(!GetSenderFor(frame.T, si))
2019-03-08 17:00:45 +01:00
return false;
// verify source
if(!frame.Verify(si))
2019-03-08 17:00:45 +01:00
return false;
// remove convotag it doesn't exist
LogWarn("remove convotag T=", frame.T);
RemoveConvoTag(frame.T);
2019-03-08 17:00:45 +01:00
return true;
}
if(!frame.AsyncDecryptAndVerify(EndpointLogic(), p, CryptoWorker(),
m_Identity, m_DataHandler))
2019-03-08 17:00:45 +01:00
{
// send discard
ProtocolFrame f;
f.R = 1;
f.T = frame.T;
2019-03-08 17:00:45 +01:00
f.F = p->intro.pathID;
if(!f.Sign(m_Identity))
2019-03-08 17:00:45 +01:00
return false;
2019-05-02 18:23:31 +02:00
{
2020-02-17 23:33:45 +01:00
LogWarn("invalidating convotag T=", frame.T);
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 18:21:11 +01:00
util::Lock lock(m_state->m_SendQueueMutex);
m_state->m_SendQueue.emplace_back(
2019-05-02 18:23:31 +02:00
std::make_shared< const routing::PathTransferMessage >(f,
frame.F),
p);
}
return true;
2019-03-08 17:00:45 +01:00
}
return true;
}
2019-04-23 18:13:22 +02:00
void Endpoint::HandlePathDied(path::Path_ptr)
2018-09-17 17:32:37 +02:00
{
2019-11-05 17:58:53 +01:00
RegenAndPublishIntroSet(true);
2019-03-30 14:02:10 +01:00
}
bool
Endpoint::CheckPathIsDead(path::Path_ptr, llarp_time_t dlt)
{
2019-04-05 16:58:22 +02:00
return dlt > path::alive_timeout;
}
2018-08-10 23:34:11 +02:00
bool
2020-01-27 22:30:41 +01:00
Endpoint::OnLookup(const Address& addr,
nonstd::optional< IntroSet > introset,
2018-10-15 17:43:41 +02:00
const RouterID& endpoint)
2018-08-10 23:34:11 +02:00
{
2019-08-02 11:27:27 +02:00
const auto now = Router()->Now();
2019-07-29 17:10:20 +02:00
auto& fails = m_state->m_ServiceLookupFails;
auto& lookups = m_state->m_PendingServiceLookups;
2020-01-27 22:30:41 +01:00
if(not introset.has_value() || introset->IsExpired(now))
{
2019-04-21 17:40:32 +02:00
LogError(Name(), " failed to lookup ", addr.ToString(), " from ",
endpoint);
fails[endpoint] = fails[endpoint] + 1;
2019-07-29 17:10:20 +02:00
// inform all
auto range = lookups.equal_range(addr);
auto itr = range.first;
if(itr != range.second)
{
itr->second(addr, nullptr);
2019-07-29 17:10:20 +02:00
itr = lookups.erase(itr);
}
2018-08-10 23:34:11 +02:00
return false;
}
2020-03-02 17:56:47 +01:00
// check for established outbound context
if(m_state->m_RemoteSessions.count(addr) > 0)
return true;
2020-01-27 22:30:41 +01:00
PutNewOutboundContext(introset.value());
2018-08-10 23:34:11 +02:00
return true;
}
2020-02-18 17:00:45 +01:00
void
Endpoint::MarkAddressOutbound(const Address& addr)
{
m_state->m_OutboundSessions.insert(addr);
}
bool
Endpoint::WantsOutboundSession(const Address& addr) const
{
return m_state->m_OutboundSessions.count(addr) > 0;
}
2018-07-19 06:58:39 +02:00
bool
2019-07-01 15:44:25 +02:00
Endpoint::EnsurePathToService(const Address remote, PathEnsureHook hook,
2020-02-23 03:21:38 +01:00
llarp_time_t /*timeoutMS*/)
2018-07-19 06:58:39 +02:00
{
/// how many routers to use for lookups
2020-03-02 17:17:50 +01:00
static constexpr size_t NumParallelLookups = 2;
/// how many requests per router
static constexpr size_t RequestsPerLookup = 2;
2019-04-21 17:40:32 +02:00
LogInfo(Name(), " Ensure Path to ", remote.ToString());
2020-02-18 17:00:45 +01:00
MarkAddressOutbound(remote);
auto& sessions = m_state->m_RemoteSessions;
2018-07-23 01:14:29 +02:00
{
auto itr = sessions.find(remote);
if(itr != sessions.end())
2018-07-23 01:14:29 +02:00
{
2018-08-22 17:52:10 +02:00
hook(itr->first, itr->second.get());
2018-07-23 01:14:29 +02:00
return true;
}
}
// filter check for address
if(not m_state->m_RemoteLookupFilter.Insert(remote))
return false;
auto& lookups = m_state->m_PendingServiceLookups;
2020-02-12 17:40:48 +01:00
const auto paths =
2020-03-02 17:17:50 +01:00
GetManyPathsWithUniqueEndpoints(this, NumParallelLookups);
using namespace std::placeholders;
2020-02-10 18:52:24 +01:00
size_t lookedUp = 0;
const dht::Key_t location = remote.ToKey();
2020-03-02 17:12:29 +01:00
uint64_t order = 0;
2020-02-12 17:40:48 +01:00
for(const auto& path : paths)
{
2020-03-02 17:12:29 +01:00
for(size_t count = 0; count < RequestsPerLookup; ++count)
2020-02-10 18:52:24 +01:00
{
2020-03-02 17:12:29 +01:00
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
this, util::memFn(&Endpoint::OnLookup, this), location,
PubKey{remote.as_array()}, order, GenTXID());
LogInfo("doing lookup for ", remote, " via ", path->Endpoint(),
" at ", location, " order=", order);
order++;
if(job->SendRequestViaPath(path, Router()))
{
lookups.emplace(remote, hook);
lookedUp++;
}
else
LogError(Name(), " send via path failed for lookup");
2020-02-10 18:52:24 +01:00
}
}
2020-03-02 17:18:26 +01:00
return lookedUp == (NumParallelLookups * RequestsPerLookup);
2018-07-19 06:58:39 +02:00
}
bool
2019-07-01 15:44:25 +02:00
Endpoint::EnsurePathToSNode(const RouterID snode, SNodeEnsureHook h)
2018-11-29 14:12:35 +01:00
{
static constexpr size_t MaxConcurrentSNodeSessions = 16;
auto& nodeSessions = m_state->m_SNodeSessions;
if(nodeSessions.size() >= MaxConcurrentSNodeSessions)
{
// a quick client side work arround before we do proper limiting
LogError(Name(), " has too many snode sessions");
return false;
}
using namespace std::placeholders;
if(nodeSessions.count(snode) == 0)
2018-11-29 14:12:35 +01:00
{
ConvoTag tag;
// TODO: check for collision lol no we don't but maybe we will...
// some day :DDDDD
tag.Randomize();
2019-04-23 18:13:22 +02:00
auto session = std::make_shared< exit::SNodeSession >(
snode,
2019-07-01 15:44:25 +02:00
[=](const llarp_buffer_t& pkt) -> bool {
/// TODO: V6
return HandleInboundPacket(tag, pkt, eProtocolTrafficV4);
2019-07-01 15:44:25 +02:00
},
2019-07-18 18:28:17 +02:00
Router(), numPaths, numHops, false, ShouldBundleRC());
m_state->m_SNodeSessions.emplace(snode, std::make_pair(session, tag));
2018-11-29 14:12:35 +01:00
}
2019-04-30 23:36:27 +02:00
EnsureRouterIsKnown(snode);
auto range = nodeSessions.equal_range(snode);
auto itr = range.first;
while(itr != range.second)
{
if(itr->second.first->IsReady())
h(snode, itr->second.first);
2019-03-07 16:17:29 +01:00
else
2019-04-30 23:36:27 +02:00
{
itr->second.first->AddReadyHook(std::bind(h, snode, _1));
itr->second.first->BuildOne();
2019-04-30 23:36:27 +02:00
}
++itr;
}
return true;
2018-11-29 14:12:35 +01:00
}
bool
2019-02-03 00:12:42 +01:00
Endpoint::SendToSNodeOrQueue(const RouterID& addr,
const llarp_buffer_t& buf)
2018-11-29 14:12:35 +01:00
{
2019-06-11 18:44:05 +02:00
auto pkt = std::make_shared< net::IPPacket >();
2019-04-30 23:36:27 +02:00
if(!pkt->Load(buf))
2018-11-29 14:12:35 +01:00
return false;
2019-04-30 23:36:27 +02:00
EnsurePathToSNode(addr, [pkt](RouterID, exit::BaseSession_ptr s) {
if(s)
s->QueueUpstreamTraffic(*pkt, routing::ExitPadSize);
});
return true;
2018-11-29 14:12:35 +01:00
}
2019-04-30 18:07:17 +02:00
void Endpoint::Pump(llarp_time_t)
2019-04-25 19:15:56 +02:00
{
const auto& sessions = m_state->m_SNodeSessions;
auto& queue = m_state->m_InboundTrafficQueue;
auto epPump = [&]() {
2019-11-29 00:08:02 +01:00
FlushRecvData();
2019-05-22 18:20:50 +02:00
// send downstream packets to user for snode
for(const auto& item : sessions)
item.second.first->FlushDownstream();
// send downstream traffic to user for hidden service
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 18:21:11 +01:00
util::Lock lock(m_state->m_InboundTrafficQueueMutex);
while(not queue.empty())
{
const auto& msg = queue.top();
2019-07-18 18:28:17 +02:00
const llarp_buffer_t buf(msg->payload);
HandleInboundPacket(msg->tag, buf, msg->proto);
queue.pop();
}
};
if(NetworkIsIsolated())
{
LogicCall(EndpointLogic(), epPump);
}
else
{
epPump();
}
2019-04-30 18:07:17 +02:00
auto router = Router();
// TODO: locking on this container
for(const auto& item : m_state->m_RemoteSessions)
2019-04-30 18:07:17 +02:00
item.second->FlushUpstream();
// TODO: locking on this container
for(const auto& item : sessions)
item.second.first->FlushUpstream();
2019-09-19 22:28:12 +02:00
{
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 18:21:11 +01:00
util::Lock lock(m_state->m_SendQueueMutex);
// send outbound traffic
for(const auto& item : m_state->m_SendQueue)
{
item.second->SendRoutingMessage(*item.first, router);
MarkConvoTagActive(item.first->T.T);
}
m_state->m_SendQueue.clear();
2019-09-19 22:28:12 +02:00
}
2019-12-30 14:20:50 +01:00
UpstreamFlush(router);
router->linkManager().PumpLinks();
2019-04-25 19:15:56 +02:00
}
bool
2020-02-23 03:21:38 +01:00
Endpoint::EnsureConvo(const AlignedBuffer< 32 > /*addr*/, bool snode,
ConvoEventListener_ptr /*ev*/)
{
if(snode)
{
}
// TODO: something meaningful
return false;
}
2018-08-22 17:52:10 +02:00
bool
2019-06-06 12:52:27 +02:00
Endpoint::SendToServiceOrQueue(const service::Address& remote,
const llarp_buffer_t& data, ProtocolType t)
2018-08-22 17:52:10 +02:00
{
2019-11-29 01:37:58 +01:00
if(data.sz == 0)
return false;
// inbound converstation
2019-07-18 18:28:17 +02:00
const auto now = Now();
2018-11-14 13:23:08 +01:00
if(HasInboundConvo(remote))
{
auto transfer = std::make_shared< routing::PathTransferMessage >();
ProtocolFrame& f = transfer->T;
std::shared_ptr< path::Path > p;
std::set< ConvoTag > tags;
if(GetConvoTagsForService(remote, tags))
{
2019-06-28 16:12:20 +02:00
// the remote guy's intro
Introduction remoteIntro;
2019-06-28 16:12:20 +02:00
Introduction replyPath;
SharedSecret K;
// pick tag
for(const auto& tag : tags)
{
if(tag.IsZero())
continue;
if(!GetCachedSessionKeyFor(tag, K))
continue;
2019-06-28 16:12:20 +02:00
if(!GetReplyIntroFor(tag, replyPath))
continue;
if(!GetIntroFor(tag, remoteIntro))
continue;
// get path for intro
ForEachPath([&](path::Path_ptr path) {
if(path->intro == replyPath)
{
p = path;
2019-07-01 22:45:00 +02:00
return;
}
if(p && p->ExpiresSoon(now) && path->IsReady()
&& path->intro.router == replyPath.router)
{
2019-06-28 16:12:20 +02:00
p = path;
}
2019-06-28 16:12:20 +02:00
});
if(p)
{
2019-06-28 16:12:20 +02:00
f.T = tag;
}
}
if(p)
{
// TODO: check expiration of our end
auto m = std::make_shared< ProtocolMessage >(f.T);
m->PutBuffer(data);
f.N.Randomize();
f.C.Zero();
transfer->Y.Randomize();
m->proto = t;
m->introReply = p->intro;
PutReplyIntroFor(f.T, m->introReply);
m->sender = m_Identity.pub;
m->seqno = GetSeqNoForConvo(f.T);
f.S = 1;
f.F = m->introReply.pathID;
transfer->P = remoteIntro.pathID;
auto self = this;
return CryptoWorker()->addJob([transfer, p, m, K, self]() {
if(not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
{
LogError("failed to encrypt and sign");
return;
}
De-abseil, part 2: mutex, locks, (most) time - util::Mutex is now a std::shared_timed_mutex, which is capable of exclusive and shared locks. - util::Lock is still present as a std::lock_guard<util::Mutex>. - the locking annotations are preserved, but updated to the latest supported by clang rather than using abseil's older/deprecated ones. - ACQUIRE_LOCK macro is gone since we don't pass mutexes by pointer into locks anymore (WTF abseil). - ReleasableLock is gone. Instead there are now some llarp::util helper methods to obtain unique and/or shared locks: - `auto lock = util::unique_lock(mutex);` gets an RAII-but-also unlockable object (std::unique_lock<T>, with T inferred from `mutex`). - `auto lock = util::shared_lock(mutex);` gets an RAII shared (i.e. "reader") lock of the mutex. - `auto lock = util::unique_locks(mutex1, mutex2, mutex3);` can be used to atomically lock multiple mutexes at once (returning a tuple of the locks). This are templated on the mutex which makes them a bit more flexible than using a concrete type: they can be used for any type of lockable mutex, not only util::Mutex. (Some of the code here uses them for getting locks around a std::mutex). Until C++17, using the RAII types is painfully verbose: ```C++ // pre-C++17 - needing to figure out the mutex type here is annoying: std::unique_lock<util::Mutex> lock(mutex); // pre-C++17 and even more verbose (but at least the type isn't needed): std::unique_lock<decltype(mutex)> lock(mutex); // our compromise: auto lock = util::unique_lock(mutex); // C++17: std::unique_lock lock(mutex); ``` All of these functions will also warn (under gcc or clang) if you discard the return value. You can also do fancy things like `auto l = util::unique_lock(mutex, std::adopt_lock)` (which lets a lock take over an already-locked mutex). - metrics code is gone, which also removes a big pile of code that was only used by metrics: - llarp::util::Scheduler - llarp::thread::TimerQueue - llarp::util::Stopwatch
2020-02-21 18:21:11 +01:00
util::Lock lock(self->m_state->m_SendQueueMutex);
self->m_state->m_SendQueue.emplace_back(transfer, p);
});
}
}
}
2020-02-18 17:00:45 +01:00
else
2018-08-22 17:52:10 +02:00
{
auto& sessions = m_state->m_RemoteSessions;
auto range = sessions.equal_range(remote);
auto itr = range.first;
while(itr != range.second)
{
if(itr->second->ReadyToSend())
{
itr->second->AsyncEncryptAndSendTo(data, t);
return true;
}
++itr;
}
2020-02-18 17:00:45 +01:00
// if we want to make an outbound session
if(WantsOutboundSession(remote))
{
// add pending traffic
auto& traffic = m_state->m_PendingTraffic;
traffic[remote].emplace_back(data, t);
return EnsurePathToService(
remote,
[self = this](Address addr, OutboundContext* ctx) {
if(ctx)
{
ctx->UpdateIntroSet();
for(auto& pending : self->m_state->m_PendingTraffic[addr])
{
ctx->AsyncEncryptAndSendTo(pending.Buffer(),
pending.protocol);
}
}
self->m_state->m_PendingTraffic.erase(addr);
},
2020-02-24 20:40:45 +01:00
1500ms);
}
2018-08-22 17:52:10 +02:00
}
return false;
}
2018-08-22 17:52:10 +02:00
2019-03-08 18:00:13 +01:00
bool
Endpoint::HasConvoTag(const ConvoTag& t) const
{
return Sessions().find(t) != Sessions().end();
2019-03-08 18:00:13 +01:00
}
2018-08-09 21:02:17 +02:00
uint64_t
Endpoint::GetSeqNoForConvo(const ConvoTag& tag)
{
auto itr = Sessions().find(tag);
if(itr == Sessions().end())
2018-08-09 21:02:17 +02:00
return 0;
return ++(itr->second.seqno);
}
2019-03-08 15:36:24 +01:00
bool
Endpoint::ShouldBuildMore(llarp_time_t now) const
{
2019-11-05 17:58:53 +01:00
if(path::Builder::BuildCooldownHit(now))
return false;
size_t numBuilding = NumInStatus(path::ePathBuilding);
2020-02-24 20:47:06 +01:00
if(numBuilding > 0)
return false;
return ((now - lastBuild) > path::intro_path_spread)
|| NumInStatus(path::ePathEstablished) < path::min_intro_paths;
}
2019-05-22 18:20:50 +02:00
std::shared_ptr< Logic >
2018-08-09 21:02:17 +02:00
Endpoint::RouterLogic()
2018-07-19 06:58:39 +02:00
{
return Router()->logic();
2018-07-19 06:58:39 +02:00
}
2019-05-22 18:20:50 +02:00
std::shared_ptr< Logic >
2018-08-09 21:02:17 +02:00
Endpoint::EndpointLogic()
{
return m_state->m_IsolatedLogic ? m_state->m_IsolatedLogic
: Router()->logic();
2018-08-09 21:02:17 +02:00
}
2019-07-09 15:47:24 +02:00
std::shared_ptr< llarp::thread::ThreadPool >
Endpoint::CryptoWorker()
2018-07-19 06:58:39 +02:00
{
return Router()->threadpool();
2018-07-19 06:58:39 +02:00
}
AbstractRouter*
Endpoint::Router()
{
return m_state->m_Router;
}
const std::set< RouterID >&
Endpoint::SnodeBlacklist() const
{
return m_state->m_SnodeBlacklist;
}
const IntroSet&
Endpoint::introSet() const
{
return m_state->m_IntroSet;
}
IntroSet&
Endpoint::introSet()
{
return m_state->m_IntroSet;
}
const ConvoMap&
Endpoint::Sessions() const
{
return m_state->m_Sessions;
}
ConvoMap&
Endpoint::Sessions()
{
return m_state->m_Sessions;
}
2018-07-12 20:21:44 +02:00
} // namespace service
2018-07-16 05:32:13 +02:00
} // namespace llarp