mirror of https://github.com/oxen-io/lokinet
start work on seperating ips out of endpoint
This commit is contained in:
parent
64e9622270
commit
d6ec528a72
|
@ -162,7 +162,7 @@ craete a new convseration on a loki address (bidi)
|
|||
{
|
||||
A: "make-convo",
|
||||
C: "<16 bytes session cookie>",
|
||||
R: "<32 bytes loki address>",
|
||||
R: "human readable address .snode/.loki",
|
||||
T: "<16 bytes convo tag>",
|
||||
Y: sequence_num_uint64,
|
||||
Z: "<32 bytes keyed hash>"
|
||||
|
@ -173,7 +173,7 @@ infrom the status of a converstation on a loki address (S->C)
|
|||
sent every 500 ms until fully established for outbound convos and immediately
|
||||
when a new inbound conversation is made.
|
||||
|
||||
S bit 0 (LSB): we found the introset for (set by outbound)
|
||||
S bit 0 (LSB): we found the introset/endpoint for (set by outbound)
|
||||
S bit 1: we found the router to align on (set by outbound)
|
||||
S bit 2: we have a path right now (set by outbound)
|
||||
S bit 3: we have made the converstation (set by both)
|
||||
|
@ -182,7 +182,7 @@ S bit 4: we are an inbound converstation (set by inbound)
|
|||
{
|
||||
A: "convo",
|
||||
C: "<16 bytes session cookie>",
|
||||
R: "<32 bytes loki remote address>",
|
||||
R: "human readable address .snode/.loki",
|
||||
S: bitmask_status_uint64,
|
||||
Y: sequence_num_uint64,
|
||||
Z: "<32 bytes keyed hash>"
|
||||
|
|
|
@ -18,8 +18,8 @@ namespace llarp
|
|||
}
|
||||
|
||||
bool
|
||||
HandleIPPacket(const AlignedBuffer< 32 >, const llarp_buffer_t &,
|
||||
bool) override
|
||||
HandleInboundPacket(const service::ConvoTag, const llarp_buffer_t &,
|
||||
service::ProtocolType) override
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -739,7 +739,6 @@ namespace llarp
|
|||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if(m_SNodes.at(itr->second))
|
||||
{
|
||||
sendFunc = std::bind(&TunEndpoint::SendToSNodeOrQueue, this,
|
||||
|
|
|
@ -82,12 +82,18 @@ namespace llarp
|
|||
|
||||
/// overrides Endpoint
|
||||
bool
|
||||
HandleIPPacket(const AlignedBuffer< 32 > addr, const llarp_buffer_t& buf,
|
||||
bool serviceNode) override
|
||||
HandleInboundPacket(const service::ConvoTag tag,
|
||||
const llarp_buffer_t& pkt,
|
||||
service::ProtocolType t) override
|
||||
{
|
||||
return HandleWriteIPPacket(buf, [=]() -> huint128_t {
|
||||
return ObtainIPForAddr(addr, serviceNode);
|
||||
});
|
||||
if(t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6)
|
||||
return false;
|
||||
AlignedBuffer< 32 > addr;
|
||||
bool snode = false;
|
||||
if(!GetEndpointWithConvoTag(tag, addr, snode))
|
||||
return false;
|
||||
return HandleWriteIPPacket(
|
||||
pkt, [=]() -> huint128_t { return ObtainIPForAddr(addr, snode); });
|
||||
}
|
||||
|
||||
/// handle inbound traffic
|
||||
|
|
|
@ -167,9 +167,14 @@ namespace llarp
|
|||
if(ShouldBuildMore(now))
|
||||
BuildOne();
|
||||
TickPaths(now, router);
|
||||
if(m_BuildStats.SuccsessRatio() <= BuildStats::MinGoodRatio)
|
||||
if(m_BuildStats.attempts > 50)
|
||||
{
|
||||
LogWarn(Name(), " has a low path build success. ", m_BuildStats);
|
||||
if(m_BuildStats.SuccsessRatio() <= BuildStats::MinGoodRatio
|
||||
&& now - m_LastWarn > 5000)
|
||||
{
|
||||
LogWarn(Name(), " has a low path build success. ", m_BuildStats);
|
||||
m_LastWarn = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,9 @@ namespace llarp
|
|||
|
||||
struct Builder : public PathSet
|
||||
{
|
||||
private:
|
||||
llarp_time_t m_LastWarn = 0;
|
||||
|
||||
protected:
|
||||
/// flag for PathSet::Stop()
|
||||
std::atomic< bool > _run;
|
||||
|
|
|
@ -311,7 +311,7 @@ namespace llarp
|
|||
BuildStats::ToString() const
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << SuccsessRatio() << " percent success ";
|
||||
ss << (SuccsessRatio() * 100.0) << " percent success ";
|
||||
ss << "(success=" << success << " ";
|
||||
ss << "attempts=" << attempts << " ";
|
||||
ss << "timeouts=" << timeouts << " ";
|
||||
|
@ -323,8 +323,8 @@ namespace llarp
|
|||
BuildStats::SuccsessRatio() const
|
||||
{
|
||||
if(attempts)
|
||||
return success / attempts;
|
||||
return 0;
|
||||
return double(success) / double(attempts);
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
bool
|
||||
|
|
|
@ -192,6 +192,33 @@ namespace llarp
|
|||
return m_PendingRouters.find(remote) != m_PendingRouters.end();
|
||||
}
|
||||
|
||||
bool
|
||||
Endpoint::GetEndpointWithConvoTag(const ConvoTag tag,
|
||||
llarp::AlignedBuffer< 32 >& addr,
|
||||
bool& snode) const
|
||||
{
|
||||
auto itr = m_Sessions.find(tag);
|
||||
if(itr != m_Sessions.end())
|
||||
{
|
||||
snode = false;
|
||||
addr = itr->second.remote.Addr();
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
for(const auto& item : m_SNodeSessions)
|
||||
{
|
||||
if(item.second.second == tag)
|
||||
{
|
||||
snode = true;
|
||||
addr = item.first;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
Endpoint::IntrosetIsStale() const
|
||||
{
|
||||
|
@ -210,11 +237,15 @@ namespace llarp
|
|||
|
||||
if(!m_Tag.IsZero())
|
||||
obj.Put("tag", m_Tag.ToString());
|
||||
|
||||
obj.PutContainer("deadSessions", m_DeadSessions);
|
||||
obj.PutContainer("remoteSessions", m_RemoteSessions);
|
||||
obj.PutContainer("snodeSessions", m_SNodeSessions);
|
||||
obj.PutContainer("lookups", m_PendingLookups);
|
||||
static auto getSecond = [](const auto& item) -> const auto&
|
||||
{
|
||||
return item.second;
|
||||
};
|
||||
obj.PutContainer("deadSessions", m_DeadSessions, getSecond);
|
||||
obj.PutContainer("remoteSessions", m_RemoteSessions, getSecond);
|
||||
obj.PutContainer("lookups", m_PendingLookups, getSecond);
|
||||
obj.PutContainer("snodeSessions", m_SNodeSessions,
|
||||
[](const auto& item) { return item.second.first; });
|
||||
|
||||
util::StatusObject sessionObj{};
|
||||
|
||||
|
@ -616,12 +647,15 @@ namespace llarp
|
|||
Endpoint::ResetInternalState()
|
||||
{
|
||||
path::Builder::ResetInternalState();
|
||||
static auto resetState = [](auto& container) {
|
||||
std::for_each(container.begin(), container.end(),
|
||||
[](auto& item) { item.second->ResetInternalState(); });
|
||||
static auto resetState = [](auto& container, auto getter) {
|
||||
std::for_each(container.begin(), container.end(), [getter](auto& item) {
|
||||
getter(item)->ResetInternalState();
|
||||
});
|
||||
};
|
||||
resetState(m_RemoteSessions);
|
||||
resetState(m_SNodeSessions);
|
||||
resetState(m_RemoteSessions,
|
||||
[](const auto& item) { return item.second; });
|
||||
resetState(m_SNodeSessions,
|
||||
[](const auto& item) { return item.second.first; });
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -850,7 +884,7 @@ namespace llarp
|
|||
auto itr = range.first;
|
||||
while(itr != range.second)
|
||||
{
|
||||
if(itr->second->IsReady())
|
||||
if(itr->second.first->IsReady())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
@ -1011,25 +1045,30 @@ namespace llarp
|
|||
using namespace std::placeholders;
|
||||
if(m_SNodeSessions.count(snode) == 0)
|
||||
{
|
||||
ConvoTag tag;
|
||||
// TODO: check for collision lol no we don't but maybe we will...
|
||||
// some day :DDDDD
|
||||
tag.Randomize();
|
||||
auto session = std::make_shared< exit::SNodeSession >(
|
||||
snode,
|
||||
[=](const llarp_buffer_t& pkt) -> bool {
|
||||
return HandleIPPacket(snode, pkt, true);
|
||||
/// TODO: V6
|
||||
return HandleInboundPacket(tag, pkt, eProtocolTrafficV4);
|
||||
},
|
||||
m_Router, m_NumPaths, numHops, false, ShouldBundleRC());
|
||||
m_SNodeSessions.emplace(snode, session);
|
||||
m_SNodeSessions.emplace(snode, std::make_pair(session, tag));
|
||||
}
|
||||
EnsureRouterIsKnown(snode);
|
||||
auto range = m_SNodeSessions.equal_range(snode);
|
||||
auto itr = range.first;
|
||||
while(itr != range.second)
|
||||
{
|
||||
if(itr->second->IsReady())
|
||||
h(snode, itr->second);
|
||||
if(itr->second.first->IsReady())
|
||||
h(snode, itr->second.first);
|
||||
else
|
||||
{
|
||||
itr->second->AddReadyHook(std::bind(h, snode, _1));
|
||||
itr->second->BuildOne();
|
||||
itr->second.first->AddReadyHook(std::bind(h, snode, _1));
|
||||
itr->second.first->BuildOne();
|
||||
}
|
||||
++itr;
|
||||
}
|
||||
|
@ -1054,14 +1093,14 @@ namespace llarp
|
|||
EndpointLogic()->queue_func([&]() {
|
||||
// send downstream packets to user for snode
|
||||
for(const auto& item : m_SNodeSessions)
|
||||
item.second->FlushDownstream();
|
||||
item.second.first->FlushDownstream();
|
||||
// send downstream traffic to user for hidden service
|
||||
util::Lock lock(&m_InboundTrafficQueueMutex);
|
||||
while(m_InboundTrafficQueue.size())
|
||||
{
|
||||
const auto& msg = m_InboundTrafficQueue.top();
|
||||
llarp_buffer_t buf(msg->payload);
|
||||
HandleIPPacket(msg->sender.Addr(), buf, false);
|
||||
HandleInboundPacket(msg->tag, buf, msg->proto);
|
||||
m_InboundTrafficQueue.pop();
|
||||
}
|
||||
});
|
||||
|
@ -1072,7 +1111,7 @@ namespace llarp
|
|||
item.second->FlushUpstream();
|
||||
// TODO: locking on this container
|
||||
for(const auto& item : m_SNodeSessions)
|
||||
item.second->FlushUpstream();
|
||||
item.second.first->FlushUpstream();
|
||||
util::Lock lock(&m_SendQueueMutex);
|
||||
// send outbound traffic
|
||||
for(const auto& item : m_SendQueue)
|
||||
|
@ -1080,6 +1119,15 @@ namespace llarp
|
|||
m_SendQueue.clear();
|
||||
}
|
||||
|
||||
bool
|
||||
Endpoint::EnsureConvo(const AlignedBuffer< 32 > addr, bool snode,
|
||||
ConvoEventListener_ptr ev)
|
||||
{
|
||||
if(snode)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
Endpoint::SendToServiceOrQueue(const service::Address& remote,
|
||||
const llarp_buffer_t& data, ProtocolType t)
|
||||
|
|
|
@ -34,6 +34,29 @@ namespace llarp
|
|||
struct Context;
|
||||
struct OutboundContext;
|
||||
|
||||
struct IConvoEventListener
|
||||
{
|
||||
~IConvoEventListener() = default;
|
||||
|
||||
/// called when we have obtained the introset
|
||||
/// called with nullptr on not found or when we
|
||||
/// talking to a snode
|
||||
virtual void
|
||||
FoundIntroSet(const IntroSet*) = 0;
|
||||
|
||||
/// called when we found the RC we need for alignment
|
||||
virtual void
|
||||
FoundRC(const RouterContact) = 0;
|
||||
|
||||
/// called when we have successfully built an aligned path
|
||||
virtual void GotAlignedPath(path::Path_ptr) = 0;
|
||||
|
||||
/// called when we have established a session or conversation
|
||||
virtual void
|
||||
MadeConvo(const ConvoTag) = 0;
|
||||
};
|
||||
using ConvoEventListener_ptr = std::shared_ptr< IConvoEventListener >;
|
||||
|
||||
struct Endpoint : public path::Builder,
|
||||
public ILookupHolder,
|
||||
public IDataHandler
|
||||
|
@ -168,9 +191,10 @@ namespace llarp
|
|||
HandleDataMessage(path::Path_ptr path, const PathID_t from,
|
||||
std::shared_ptr< ProtocolMessage > msg) override;
|
||||
|
||||
/// handle packet io from service node or hidden service to frontend
|
||||
virtual bool
|
||||
HandleIPPacket(const AlignedBuffer< 32 > addr, const llarp_buffer_t& pkt,
|
||||
bool serviceNode) = 0;
|
||||
HandleInboundPacket(const ConvoTag tag, const llarp_buffer_t& pkt,
|
||||
ProtocolType t) = 0;
|
||||
|
||||
// virtual bool
|
||||
// HandleWriteIPPacket(const llarp_buffer_t& pkt,
|
||||
|
@ -208,11 +232,13 @@ namespace llarp
|
|||
HandlePathBuilt(path::Path_ptr path) override;
|
||||
|
||||
bool
|
||||
SendToServiceOrQueue(const service::Address& addr,
|
||||
const llarp_buffer_t& payload, ProtocolType t);
|
||||
EnsureConvo(const AlignedBuffer< 32 > addr, bool snode,
|
||||
ConvoEventListener_ptr ev);
|
||||
|
||||
bool
|
||||
SendToSNodeOrQueue(const RouterID& addr, const llarp_buffer_t& payload);
|
||||
SendTo(const ConvoTag tag, const llarp_buffer_t& pkt, ProtocolType t);
|
||||
|
||||
;
|
||||
|
||||
bool
|
||||
HandleDataDrop(path::Path_ptr p, const PathID_t& dst, uint64_t s);
|
||||
|
@ -228,6 +254,15 @@ namespace llarp
|
|||
static void
|
||||
HandlePathDead(void*);
|
||||
|
||||
/// return true if we have a convotag as an exit session
|
||||
/// or as a hidden service session
|
||||
/// set addr and issnode
|
||||
///
|
||||
/// return false if we don't have either
|
||||
bool
|
||||
GetEndpointWithConvoTag(const ConvoTag t, AlignedBuffer< 32 >& addr,
|
||||
bool& issnode) const;
|
||||
|
||||
bool
|
||||
HasConvoTag(const ConvoTag& t) const override;
|
||||
|
||||
|
@ -315,6 +350,12 @@ namespace llarp
|
|||
GenTXID();
|
||||
|
||||
protected:
|
||||
bool
|
||||
SendToServiceOrQueue(const service::Address& addr,
|
||||
const llarp_buffer_t& payload, ProtocolType t);
|
||||
bool
|
||||
SendToSNodeOrQueue(const RouterID& addr, const llarp_buffer_t& payload);
|
||||
|
||||
/// parent context that owns this endpoint
|
||||
Context* const context;
|
||||
|
||||
|
@ -441,8 +482,12 @@ namespace llarp
|
|||
std::unordered_multimap< Address, std::shared_ptr< OutboundContext >,
|
||||
Address::Hash >;
|
||||
|
||||
using SNodeSessions = std::unordered_multimap<
|
||||
RouterID, std::shared_ptr< exit::BaseSession >, RouterID::Hash >;
|
||||
using SNodeSessionValue =
|
||||
std::pair< std::shared_ptr< exit::BaseSession >, ConvoTag >;
|
||||
|
||||
using SNodeSessions =
|
||||
std::unordered_multimap< RouterID, SNodeSessionValue,
|
||||
RouterID::Hash >;
|
||||
|
||||
using ConvoMap = std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
|
||||
|
||||
|
|
|
@ -14,19 +14,19 @@ namespace llarp
|
|||
auto itr = sessions.begin();
|
||||
while(itr != sessions.end())
|
||||
{
|
||||
if(itr->second->ShouldRemove() && itr->second->IsStopped())
|
||||
if(itr->second.first->ShouldRemove() && itr->second.first->IsStopped())
|
||||
{
|
||||
itr = sessions.erase(itr);
|
||||
continue;
|
||||
}
|
||||
// expunge next tick
|
||||
if(itr->second->IsExpired(now))
|
||||
if(itr->second.first->IsExpired(now))
|
||||
{
|
||||
itr->second->Stop();
|
||||
itr->second.first->Stop();
|
||||
}
|
||||
else
|
||||
{
|
||||
itr->second->Tick(now);
|
||||
itr->second.first->Tick(now);
|
||||
}
|
||||
|
||||
++itr;
|
||||
|
@ -139,7 +139,7 @@ namespace llarp
|
|||
{
|
||||
for(auto& item : sessions)
|
||||
{
|
||||
item.second->Stop();
|
||||
item.second.first->Stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,15 +35,15 @@ namespace llarp
|
|||
void
|
||||
Put(const value_type& value);
|
||||
|
||||
template < typename Container >
|
||||
template < typename Container, typename Getter >
|
||||
void
|
||||
PutContainer(String_t keyname, const Container& container)
|
||||
PutContainer(String_t keyname, const Container& container, Getter get)
|
||||
{
|
||||
std::vector< util::StatusObject > objs;
|
||||
std::transform(container.begin(), container.end(),
|
||||
std::back_inserter(objs),
|
||||
[](const auto& item) -> util::StatusObject {
|
||||
return item.second->ExtractStatus();
|
||||
[get](const auto& item) -> util::StatusObject {
|
||||
return get(item)->ExtractStatus();
|
||||
});
|
||||
Put(keyname, objs);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue