mirror of
https://github.com/oxen-io/lokinet
synced 2023-12-14 06:53:00 +01:00
Merge pull request #572 from michael-loki/service_endpointutil
Refactor well named functionality in service::Endpoint into new struct
This commit is contained in:
commit
c576b8dd9b
5 changed files with 226 additions and 113 deletions
|
@ -217,6 +217,7 @@ set(LIB_SRC
|
|||
service/async_key_exchange.cpp
|
||||
service/config.cpp
|
||||
service/context.cpp
|
||||
service/endpoint_util.cpp
|
||||
service/endpoint.cpp
|
||||
service/handler.cpp
|
||||
service/hidden_service_address_lookup.cpp
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include <nodedb.hpp>
|
||||
#include <profiling.hpp>
|
||||
#include <router/abstractrouter.hpp>
|
||||
#include <service/endpoint_util.hpp>
|
||||
#include <service/hidden_service_address_lookup.hpp>
|
||||
#include <service/outbound_context.hpp>
|
||||
#include <service/protocol.hpp>
|
||||
|
@ -147,12 +148,8 @@ namespace llarp
|
|||
void
|
||||
Endpoint::FlushSNodeTraffic()
|
||||
{
|
||||
auto itr = m_SNodeSessions.begin();
|
||||
while(itr != m_SNodeSessions.end())
|
||||
{
|
||||
itr->second->Flush();
|
||||
++itr;
|
||||
}
|
||||
std::for_each(m_SNodeSessions.begin(), m_SNodeSessions.end(),
|
||||
[](auto& x) { x.second->Flush(); });
|
||||
}
|
||||
|
||||
util::StatusObject
|
||||
|
@ -210,53 +207,11 @@ namespace llarp
|
|||
}
|
||||
|
||||
// expire snode sessions
|
||||
{
|
||||
auto itr = m_SNodeSessions.begin();
|
||||
while(itr != m_SNodeSessions.end())
|
||||
{
|
||||
if(itr->second->ShouldRemove() && itr->second->IsStopped())
|
||||
{
|
||||
itr = m_SNodeSessions.erase(itr);
|
||||
continue;
|
||||
}
|
||||
// expunge next tick
|
||||
if(itr->second->IsExpired(now))
|
||||
itr->second->Stop();
|
||||
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
EndpointUtil::ExpireSNodeSessions(now, m_SNodeSessions);
|
||||
// expire pending tx
|
||||
{
|
||||
auto itr = m_PendingLookups.begin();
|
||||
while(itr != m_PendingLookups.end())
|
||||
{
|
||||
if(itr->second->IsTimedOut(now))
|
||||
{
|
||||
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
|
||||
|
||||
LogInfo(lookup->name, " timed out txid=", lookup->txid);
|
||||
lookup->HandleResponse({});
|
||||
itr = m_PendingLookups.erase(itr);
|
||||
}
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
EndpointUtil::ExpirePendingTx(now, m_PendingLookups);
|
||||
// expire pending router lookups
|
||||
{
|
||||
auto itr = m_PendingRouters.begin();
|
||||
while(itr != m_PendingRouters.end())
|
||||
{
|
||||
if(itr->second.IsExpired(now))
|
||||
{
|
||||
LogInfo("lookup for ", itr->first, " timed out");
|
||||
itr = m_PendingRouters.erase(itr);
|
||||
}
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
EndpointUtil::ExpirePendingRouterLookups(now, m_PendingRouters);
|
||||
|
||||
// prefetch addrs
|
||||
for(const auto& addr : m_PrefetchAddrs)
|
||||
|
@ -265,8 +220,8 @@ namespace llarp
|
|||
{
|
||||
if(!EnsurePathToService(
|
||||
addr,
|
||||
[](__attribute__((unused)) Address addr,
|
||||
__attribute__((unused)) OutboundContext* ctx) {},
|
||||
[](ABSL_ATTRIBUTE_UNUSED Address addr,
|
||||
ABSL_ATTRIBUTE_UNUSED OutboundContext* ctx) {},
|
||||
10000))
|
||||
{
|
||||
LogWarn("failed to ensure path to ", addr);
|
||||
|
@ -316,57 +271,20 @@ namespace llarp
|
|||
#endif
|
||||
|
||||
// deregister dead sessions
|
||||
{
|
||||
auto itr = m_DeadSessions.begin();
|
||||
while(itr != m_DeadSessions.end())
|
||||
{
|
||||
if(itr->second->IsDone(now))
|
||||
itr = m_DeadSessions.erase(itr);
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
EndpointUtil::DeregisterDeadSessions(now, m_DeadSessions);
|
||||
// tick remote sessions
|
||||
{
|
||||
auto itr = m_RemoteSessions.begin();
|
||||
while(itr != m_RemoteSessions.end())
|
||||
{
|
||||
if(itr->second->Tick(now))
|
||||
{
|
||||
itr->second->Stop();
|
||||
m_DeadSessions.emplace(itr->first, std::move(itr->second));
|
||||
itr = m_RemoteSessions.erase(itr);
|
||||
}
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
EndpointUtil::TickRemoteSessions(now, m_RemoteSessions, m_DeadSessions);
|
||||
// expire convotags
|
||||
{
|
||||
auto itr = m_Sessions.begin();
|
||||
while(itr != m_Sessions.end())
|
||||
{
|
||||
if(itr->second.IsExpired(now))
|
||||
itr = m_Sessions.erase(itr);
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
EndpointUtil::ExpireConvoSessions(now, m_Sessions);
|
||||
}
|
||||
|
||||
bool
|
||||
Endpoint::Stop()
|
||||
{
|
||||
// stop remote sessions
|
||||
for(auto& item : m_RemoteSessions)
|
||||
{
|
||||
item.second->Stop();
|
||||
}
|
||||
EndpointUtil::StopRemoteSessions(m_RemoteSessions);
|
||||
// stop snode sessions
|
||||
for(auto& item : m_SNodeSessions)
|
||||
{
|
||||
item.second->Stop();
|
||||
}
|
||||
EndpointUtil::StopSnodeSessions(m_SNodeSessions);
|
||||
return path::Builder::Stop();
|
||||
}
|
||||
|
||||
|
@ -388,15 +306,7 @@ namespace llarp
|
|||
bool
|
||||
Endpoint::HasPathToService(const Address& addr) const
|
||||
{
|
||||
auto range = m_RemoteSessions.equal_range(addr);
|
||||
Sessions::const_iterator itr = range.first;
|
||||
while(itr != range.second)
|
||||
{
|
||||
if(itr->second->ReadyToSend())
|
||||
return true;
|
||||
++itr;
|
||||
}
|
||||
return false;
|
||||
return EndpointUtil::HasPathToService(addr, m_RemoteSessions);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -958,8 +868,7 @@ namespace llarp
|
|||
|
||||
bool
|
||||
Endpoint::EnsurePathToService(const Address& remote, PathEnsureHook hook,
|
||||
__attribute__((unused))
|
||||
llarp_time_t timeoutMS,
|
||||
ABSL_ATTRIBUTE_UNUSED llarp_time_t timeoutMS,
|
||||
bool randomPath)
|
||||
{
|
||||
path::Path* path = nullptr;
|
||||
|
|
|
@ -332,6 +332,8 @@ namespace llarp
|
|||
std::unique_ptr< exit::BaseSession > m_Exit;
|
||||
|
||||
private:
|
||||
friend struct EndpointUtil;
|
||||
|
||||
AbstractRouter* m_Router;
|
||||
llarp_threadpool* m_IsolatedWorker = nullptr;
|
||||
Logic* m_IsolatedLogic = nullptr;
|
||||
|
@ -387,8 +389,9 @@ namespace llarp
|
|||
}
|
||||
};
|
||||
|
||||
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >
|
||||
m_PendingRouters;
|
||||
using PendingRouters =
|
||||
std::unordered_map< RouterID, RouterLookupJob, RouterID::Hash >;
|
||||
PendingRouters m_PendingRouters;
|
||||
|
||||
uint64_t m_CurrentPublishTX = 0;
|
||||
llarp_time_t m_LastPublish = 0;
|
||||
|
@ -397,8 +400,10 @@ namespace llarp
|
|||
/// our introset
|
||||
service::IntroSet m_IntroSet;
|
||||
/// pending remote service lookups by id
|
||||
std::unordered_map< uint64_t, std::unique_ptr< service::IServiceLookup > >
|
||||
m_PendingLookups;
|
||||
using PendingLookups =
|
||||
std::unordered_map< uint64_t,
|
||||
std::unique_ptr< service::IServiceLookup > >;
|
||||
PendingLookups m_PendingLookups;
|
||||
/// prefetch remote address list
|
||||
std::set< Address > m_PrefetchAddrs;
|
||||
/// hidden service tag
|
||||
|
@ -409,10 +414,9 @@ namespace llarp
|
|||
std::list< std::function< bool(void) > > m_OnInit;
|
||||
|
||||
/// conversations
|
||||
using ConvoMap_t =
|
||||
std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
|
||||
using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
|
||||
|
||||
ConvoMap_t m_Sessions;
|
||||
ConvoMap m_Sessions;
|
||||
|
||||
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
|
||||
};
|
||||
|
|
153
llarp/service/endpoint_util.cpp
Normal file
153
llarp/service/endpoint_util.cpp
Normal file
|
@ -0,0 +1,153 @@
|
|||
#include <service/endpoint_util.hpp>
|
||||
|
||||
#include <service/outbound_context.hpp>
|
||||
#include <util/logger.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace service
|
||||
{
|
||||
void
|
||||
EndpointUtil::ExpireSNodeSessions(llarp_time_t now,
|
||||
Endpoint::SNodeSessions& sessions)
|
||||
{
|
||||
auto itr = sessions.begin();
|
||||
while(itr != sessions.end())
|
||||
{
|
||||
if(itr->second->ShouldRemove() && itr->second->IsStopped())
|
||||
{
|
||||
itr = sessions.erase(itr);
|
||||
continue;
|
||||
}
|
||||
// expunge next tick
|
||||
if(itr->second->IsExpired(now))
|
||||
{
|
||||
itr->second->Stop();
|
||||
}
|
||||
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::ExpirePendingTx(llarp_time_t now,
|
||||
Endpoint::PendingLookups& lookups)
|
||||
{
|
||||
for(auto itr = lookups.begin(); itr != lookups.end();)
|
||||
{
|
||||
if(!itr->second->IsTimedOut(now))
|
||||
{
|
||||
++itr;
|
||||
continue;
|
||||
}
|
||||
std::unique_ptr< IServiceLookup > lookup = std::move(itr->second);
|
||||
|
||||
LogInfo(lookup->name, " timed out txid=", lookup->txid);
|
||||
lookup->HandleResponse({});
|
||||
itr = lookups.erase(itr);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::ExpirePendingRouterLookups(llarp_time_t now,
|
||||
Endpoint::PendingRouters& routers)
|
||||
{
|
||||
for(auto itr = routers.begin(); itr != routers.end();)
|
||||
{
|
||||
if(!itr->second.IsExpired(now))
|
||||
{
|
||||
++itr;
|
||||
continue;
|
||||
}
|
||||
LogInfo("lookup for ", itr->first, " timed out");
|
||||
itr = routers.erase(itr);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::DeregisterDeadSessions(llarp_time_t now,
|
||||
Endpoint::Sessions& sessions)
|
||||
{
|
||||
auto itr = sessions.begin();
|
||||
while(itr != sessions.end())
|
||||
{
|
||||
if(itr->second->IsDone(now))
|
||||
{
|
||||
itr = sessions.erase(itr);
|
||||
}
|
||||
else
|
||||
{
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::TickRemoteSessions(llarp_time_t now,
|
||||
Endpoint::Sessions& remoteSessions,
|
||||
Endpoint::Sessions& deadSessions)
|
||||
{
|
||||
auto itr = remoteSessions.begin();
|
||||
while(itr != remoteSessions.end())
|
||||
{
|
||||
if(itr->second->Tick(now))
|
||||
{
|
||||
itr->second->Stop();
|
||||
deadSessions.emplace(std::move(*itr));
|
||||
itr = remoteSessions.erase(itr);
|
||||
}
|
||||
else
|
||||
{
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::ExpireConvoSessions(llarp_time_t now,
|
||||
Endpoint::ConvoMap& sessions)
|
||||
{
|
||||
auto itr = sessions.begin();
|
||||
while(itr != sessions.end())
|
||||
{
|
||||
if(itr->second.IsExpired(now))
|
||||
itr = sessions.erase(itr);
|
||||
else
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::StopRemoteSessions(Endpoint::Sessions& remoteSessions)
|
||||
{
|
||||
for(auto& item : remoteSessions)
|
||||
{
|
||||
item.second->Stop();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
EndpointUtil::StopSnodeSessions(Endpoint::SNodeSessions& sessions)
|
||||
{
|
||||
for(auto& item : sessions)
|
||||
{
|
||||
item.second->Stop();
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
EndpointUtil::HasPathToService(const Address& addr,
|
||||
const Endpoint::Sessions& remoteSessions)
|
||||
{
|
||||
auto range = remoteSessions.equal_range(addr);
|
||||
auto itr = range.first;
|
||||
while(itr != range.second)
|
||||
{
|
||||
if(itr->second->ReadyToSend())
|
||||
return true;
|
||||
++itr;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
} // namespace service
|
||||
} // namespace llarp
|
46
llarp/service/endpoint_util.hpp
Normal file
46
llarp/service/endpoint_util.hpp
Normal file
|
@ -0,0 +1,46 @@
|
|||
#ifndef LLARP_SERVICE_ENDPOINT_UTIL_HPP
|
||||
#define LLARP_SERVICE_ENDPOINT_UTIL_HPP
|
||||
|
||||
#include <service/endpoint.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
namespace service
|
||||
{
|
||||
struct EndpointUtil
|
||||
{
|
||||
static void
|
||||
ExpireSNodeSessions(llarp_time_t now, Endpoint::SNodeSessions& sessions);
|
||||
|
||||
static void
|
||||
ExpirePendingTx(llarp_time_t now, Endpoint::PendingLookups& lookups);
|
||||
|
||||
static void
|
||||
ExpirePendingRouterLookups(llarp_time_t now,
|
||||
Endpoint::PendingRouters& routers);
|
||||
|
||||
static void
|
||||
DeregisterDeadSessions(llarp_time_t now, Endpoint::Sessions& sessions);
|
||||
|
||||
static void
|
||||
TickRemoteSessions(llarp_time_t now, Endpoint::Sessions& remoteSessions,
|
||||
Endpoint::Sessions& deadSessions);
|
||||
|
||||
static void
|
||||
ExpireConvoSessions(llarp_time_t now, Endpoint::ConvoMap& sessions);
|
||||
|
||||
static void
|
||||
StopRemoteSessions(Endpoint::Sessions& remoteSessions);
|
||||
|
||||
static void
|
||||
StopSnodeSessions(Endpoint::SNodeSessions& sessions);
|
||||
|
||||
static bool
|
||||
HasPathToService(const Address& addr,
|
||||
const Endpoint::Sessions& remoteSessions);
|
||||
};
|
||||
} // namespace service
|
||||
|
||||
} // namespace llarp
|
||||
|
||||
#endif
|
Loading…
Reference in a new issue