fix up client to snode codepath

This commit is contained in:
Jeff Becker 2021-04-07 08:03:04 -04:00
parent 8b2ede5fc5
commit 4889b8cddf
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
6 changed files with 57 additions and 35 deletions

View File

@ -127,6 +127,7 @@ namespace llarp
if (b == 0s)
{
llarp::LogInfo("obtained an exit via ", p->Endpoint());
m_CurrentPath = p->RXID();
CallPendingCallbacks(true);
}
return true;
@ -257,6 +258,8 @@ namespace llarp
bool
BaseSession::IsReady() const
{
if (m_CurrentPath.IsZero())
return false;
const size_t expect = (1 + (numDesiredPaths / 2));
return AvailablePaths(llarp::path::ePathRoleExit) >= expect;
}

View File

@ -114,6 +114,14 @@ namespace llarp
return m_ExitRouter;
}
std::optional<PathID_t>
CurrentPath() const
{
if (m_CurrentPath.IsZero())
return std::nullopt;
return m_CurrentPath;
}
bool
IsExpired(llarp_time_t now) const;
@ -151,6 +159,8 @@ namespace llarp
using TieredQueue_t = std::map<uint8_t, UpstreamTrafficQueue_t>;
TieredQueue_t m_Upstream;
PathID_t m_CurrentPath;
using DownstreamPkt = std::pair<uint64_t, llarp::net::IPPacket>;
struct DownstreamPktSorter

View File

@ -170,9 +170,10 @@ namespace llarp
for (const auto& item : m_state->m_SNodeSessions)
{
if (item.second.second == tag)
if (const auto maybe = item.second->CurrentPath())
{
return item.first;
if (ConvoTag{maybe->as_array()} == tag)
return item.first;
}
}
return std::nullopt;
@ -616,7 +617,7 @@ namespace llarp
});
};
resetState(m_state->m_RemoteSessions, [](const auto& item) { return item.second; });
resetState(m_state->m_SNodeSessions, [](const auto& item) { return item.second.first; });
resetState(m_state->m_SNodeSessions, [](const auto& item) { return item.second; });
}
bool
@ -1006,7 +1007,7 @@ namespace llarp
auto itr = range.first;
while (itr != range.second)
{
if (itr->second.first->IsReady())
if (itr->second->IsReady())
{
return true;
}
@ -1359,10 +1360,6 @@ namespace llarp
using namespace std::placeholders;
if (nodeSessions.count(snode) == 0)
{
ConvoTag tag;
// TODO: check for collision lol no we don't but maybe we will...
// some day :DDDDD
tag.Randomize();
const auto src = xhtonl(net::TruncateV6(GetIfAddr()));
const auto dst = xhtonl(net::TruncateV6(ObtainIPForAddr(snode)));
@ -1374,27 +1371,42 @@ namespace llarp
return false;
pkt.UpdateIPv4Address(src, dst);
/// TODO: V6
return HandleInboundPacket(tag, pkt.ConstBuffer(), ProtocolType::TrafficV4, 0);
auto itr = m_state->m_SNodeSessions.find(snode);
if (itr == m_state->m_SNodeSessions.end())
return false;
if (const auto maybe = itr->second->CurrentPath())
return HandleInboundPacket(
ConvoTag{maybe->as_array()}, pkt.ConstBuffer(), ProtocolType::TrafficV4, 0);
return false;
},
Router(),
numDesiredPaths,
1,
numHops,
false,
this);
m_state->m_SNodeSessions.emplace(snode, std::make_pair(session, tag));
m_state->m_SNodeSessions[snode] = session;
}
EnsureRouterIsKnown(snode);
auto range = nodeSessions.equal_range(snode);
auto itr = range.first;
while (itr != range.second)
{
if (itr->second.first->IsReady())
h(snode, itr->second.first, itr->second.second);
if (itr->second->IsReady())
h(snode, itr->second, ConvoTag{itr->second->CurrentPath()->as_array()});
else
{
itr->second.first->AddReadyHook(std::bind(h, snode, _1, itr->second.second));
itr->second.first->BuildOne();
itr->second->AddReadyHook([h, snode](auto session) {
if (session)
{
h(snode, session, ConvoTag{session->CurrentPath()->as_array()});
}
else
{
h(snode, nullptr, ConvoTag{});
}
});
if (not itr->second->BuildCooldownHit(Now()))
itr->second->BuildOne();
}
++itr;
}
@ -1440,10 +1452,9 @@ namespace llarp
auto pkt = std::make_shared<net::IPPacket>();
if (!pkt->Load(buf))
return false;
EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag tag) {
EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
ConvoTagTX(tag);
s->SendPacketToRemote(pkt->ConstBuffer(), t);
}
});
@ -1455,7 +1466,7 @@ namespace llarp
FlushRecvData();
// send downstream packets to user for snode
for (const auto& [router, session] : m_state->m_SNodeSessions)
session.first->FlushDownstream();
session->FlushDownstream();
// handle inbound traffic sorted
std::priority_queue<ProtocolMessage> queue;
@ -1492,7 +1503,7 @@ namespace llarp
outctx->FlushUpstream();
// TODO: locking on this container
for (const auto& [router, session] : m_state->m_SNodeSessions)
session.first->FlushUpstream();
session->FlushUpstream();
// send queue flush
while (not m_SendQueue.empty())
@ -1563,13 +1574,13 @@ namespace llarp
}
return ret;
}
if (auto ptr = std::get_if<RouterID>(&remote))
if (auto* ptr = std::get_if<RouterID>(&remote))
{
for (const auto& item : m_state->m_SNodeSessions)
{
if (item.first == *ptr)
return item.second.second;
}
auto itr = m_state->m_SNodeSessions.find(*ptr);
if (itr == m_state->m_SNodeSessions.end())
return std::nullopt;
if (auto maybe = itr->second->CurrentPath())
return ConvoTag{maybe->as_array()};
}
return std::nullopt;
}

View File

@ -83,7 +83,7 @@ namespace llarp
m_SNodeSessions.begin(),
m_SNodeSessions.end(),
std::back_inserter(obj["snodeSessions"]),
[](const auto& item) { return item.second.first->ExtractStatus(); });
[](const auto& item) { return item.second->ExtractStatus(); });
util::StatusObject sessionObj{};

View File

@ -41,9 +41,7 @@ namespace llarp
using Sessions = std::unordered_multimap<Address, std::shared_ptr<OutboundContext>>;
using SNodeSessionValue = std::pair<std::shared_ptr<exit::BaseSession>, ConvoTag>;
using SNodeSessions = std::unordered_multimap<RouterID, SNodeSessionValue>;
using SNodeSessions = std::unordered_map<RouterID, std::shared_ptr<exit::BaseSession>>;
using ConvoMap = std::unordered_map<ConvoTag, Session>;

View File

@ -15,19 +15,19 @@ namespace llarp
auto itr = sessions.begin();
while (itr != sessions.end())
{
if (itr->second.first->ShouldRemove() && itr->second.first->IsStopped())
if (itr->second->ShouldRemove() && itr->second->IsStopped())
{
itr = sessions.erase(itr);
continue;
}
// expunge next tick
if (itr->second.first->IsExpired(now))
if (itr->second->IsExpired(now))
{
itr->second.first->Stop();
itr->second->Stop();
}
else
{
itr->second.first->Tick(now);
itr->second->Tick(now);
}
++itr;
@ -138,7 +138,7 @@ namespace llarp
{
for (auto& item : sessions)
{
item.second.first->Stop();
item.second->Stop();
}
}