mirror of https://github.com/oxen-io/lokinet
correct the logic for inbound convos
send back traffic on the correct path
This commit is contained in:
parent
5da3bb6c0a
commit
b70ecade2b
|
@ -400,7 +400,7 @@ namespace llarp
|
|||
{
|
||||
for (const auto& item : Sessions())
|
||||
{
|
||||
if (item.second.remote.Addr() == addr && item.second.inbound)
|
||||
if (item.second.remote.Addr() == addr and item.second.inbound)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -421,7 +421,7 @@ namespace llarp
|
|||
Endpoint::PutSenderFor(const ConvoTag& tag, const ServiceInfo& info, bool inbound)
|
||||
{
|
||||
auto itr = Sessions().find(tag);
|
||||
if (itr == Sessions().end())
|
||||
if (itr == Sessions().end() and not(WantsOutboundSession(info.Addr()) and inbound))
|
||||
{
|
||||
itr = Sessions().emplace(tag, Session{}).first;
|
||||
itr->second.inbound = inbound;
|
||||
|
@ -1079,14 +1079,7 @@ namespace llarp
|
|||
{
|
||||
msg->sender.UpdateAddr();
|
||||
PutSenderFor(msg->tag, msg->sender, true);
|
||||
PutReplyIntroFor(msg->tag, path->intro);
|
||||
Introduction intro;
|
||||
intro.pathID = from;
|
||||
intro.router = PubKey{path->Endpoint()};
|
||||
intro.expiresAt = std::min(path->ExpireTime(), msg->introReply.expiresAt);
|
||||
intro.latency = path->intro.latency;
|
||||
PutIntroFor(msg->tag, intro);
|
||||
PutReplyIntroFor(msg->tag, path->intro);
|
||||
PutReplyIntroFor(msg->tag, msg->introReply);
|
||||
ConvoTagRX(msg->tag);
|
||||
return ProcessDataMessage(msg);
|
||||
}
|
||||
|
@ -1792,11 +1785,11 @@ namespace llarp
|
|||
return false;
|
||||
}
|
||||
|
||||
// inbound conversation
|
||||
const auto now = Now();
|
||||
|
||||
if (HasInboundConvo(remote))
|
||||
{
|
||||
// inbound conversation
|
||||
LogTrace("Have inbound convo");
|
||||
auto transfer = std::make_shared<routing::PathTransferMessage>();
|
||||
ProtocolFrame& f = transfer->T;
|
||||
|
@ -1806,86 +1799,85 @@ namespace llarp
|
|||
{
|
||||
// the remote guy's intro
|
||||
Introduction remoteIntro;
|
||||
Introduction replyPath;
|
||||
SharedSecret K;
|
||||
const auto tag = *maybe;
|
||||
|
||||
if (!GetCachedSessionKeyFor(tag, K))
|
||||
if (not GetCachedSessionKeyFor(tag, K))
|
||||
{
|
||||
LogError("no cached key for T=", tag);
|
||||
LogError(Name(), " no cached key for inbound session from ", remote, " T=", tag);
|
||||
return false;
|
||||
}
|
||||
if (!GetIntroFor(tag, remoteIntro))
|
||||
if (not GetReplyIntroFor(tag, remoteIntro))
|
||||
{
|
||||
LogError("no intro for T=", tag);
|
||||
LogError(Name(), "no reply intro for inbound session from ", remote, " T=", tag);
|
||||
return false;
|
||||
}
|
||||
if (GetReplyIntroFor(tag, replyPath))
|
||||
{
|
||||
// get path for intro
|
||||
ForEachPath([&](path::Path_ptr path) {
|
||||
if (path->intro == replyPath)
|
||||
{
|
||||
p = path;
|
||||
return;
|
||||
}
|
||||
if (p && p->ExpiresSoon(now) && path->IsReady()
|
||||
&& path->intro.router == replyPath.router)
|
||||
{
|
||||
p = path;
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
p = GetPathByRouter(remoteIntro.router);
|
||||
// get path for intro
|
||||
auto p = GetPathByRouter(remoteIntro.router);
|
||||
|
||||
if (p)
|
||||
if (not p)
|
||||
{
|
||||
f.T = tag;
|
||||
// TODO: check expiration of our end
|
||||
auto m = std::make_shared<ProtocolMessage>(f.T);
|
||||
m->PutBuffer(data);
|
||||
f.N.Randomize();
|
||||
f.C.Zero();
|
||||
f.R = 0;
|
||||
transfer->Y.Randomize();
|
||||
m->proto = t;
|
||||
m->introReply = p->intro;
|
||||
PutReplyIntroFor(f.T, m->introReply);
|
||||
m->sender = m_Identity.pub;
|
||||
if (auto maybe = GetSeqNoForConvo(f.T))
|
||||
{
|
||||
m->seqno = *maybe;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogWarn(Name(), " no session T=", f.T);
|
||||
return false;
|
||||
}
|
||||
f.S = m->seqno;
|
||||
f.F = m->introReply.pathID;
|
||||
transfer->P = remoteIntro.pathID;
|
||||
auto self = this;
|
||||
Router()->QueueWork([transfer, p, m, K, self]() {
|
||||
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
|
||||
{
|
||||
LogError("failed to encrypt and sign");
|
||||
return;
|
||||
}
|
||||
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
|
||||
});
|
||||
return true;
|
||||
LogWarn(
|
||||
Name(),
|
||||
" has no path for intro router ",
|
||||
RouterID{remoteIntro.router},
|
||||
" for inbound convo T=",
|
||||
tag);
|
||||
return false;
|
||||
}
|
||||
|
||||
f.T = tag;
|
||||
// TODO: check expiration of our end
|
||||
auto m = std::make_shared<ProtocolMessage>(f.T);
|
||||
m->PutBuffer(data);
|
||||
f.N.Randomize();
|
||||
f.C.Zero();
|
||||
f.R = 0;
|
||||
transfer->Y.Randomize();
|
||||
m->proto = t;
|
||||
m->introReply = p->intro;
|
||||
m->sender = m_Identity.pub;
|
||||
if (auto maybe = GetSeqNoForConvo(f.T))
|
||||
{
|
||||
m->seqno = *maybe;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogTrace("SendToOrQueue failed to return via inbound: no path");
|
||||
LogWarn(Name(), " could not set sequence number, no session T=", f.T);
|
||||
return false;
|
||||
}
|
||||
f.S = m->seqno;
|
||||
f.F = m->introReply.pathID;
|
||||
transfer->P = remoteIntro.pathID;
|
||||
auto self = this;
|
||||
Router()->QueueWork([transfer, p, m, K, self]() {
|
||||
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
|
||||
{
|
||||
LogError("failed to encrypt and sign for sessionn T=", transfer->T.T);
|
||||
return;
|
||||
}
|
||||
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
|
||||
});
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogWarn("Have inbound convo from ", remote, " but get-best returned none; bug?");
|
||||
LogWarn(
|
||||
Name(),
|
||||
" SendToOrQueue on inbound convo from ",
|
||||
remote,
|
||||
" but get-best returned none; bug?");
|
||||
}
|
||||
}
|
||||
if (not WantsOutboundSession(remote))
|
||||
{
|
||||
LogWarn(
|
||||
Name(),
|
||||
" SendToOrQueue on outbound session we did not mark as outbound (remote=",
|
||||
remote,
|
||||
")");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Failed to find a suitable inbound convo, look for outbound
|
||||
LogTrace("Not an inbound convo");
|
||||
|
@ -1900,34 +1892,28 @@ namespace llarp
|
|||
return true;
|
||||
}
|
||||
}
|
||||
// if we want to make an outbound session
|
||||
if (WantsOutboundSession(remote))
|
||||
{
|
||||
LogTrace("Making an outbound session and queuing the data");
|
||||
// add pending traffic
|
||||
auto& traffic = m_state->m_PendingTraffic;
|
||||
traffic[remote].emplace_back(data, t);
|
||||
EnsurePathToService(
|
||||
remote,
|
||||
[self = this](Address addr, OutboundContext* ctx) {
|
||||
if (ctx)
|
||||
LogTrace("Making an outbound session and queuing the data");
|
||||
// add pending traffic
|
||||
auto& traffic = m_state->m_PendingTraffic;
|
||||
traffic[remote].emplace_back(data, t);
|
||||
EnsurePathToService(
|
||||
remote,
|
||||
[self = this](Address addr, OutboundContext* ctx) {
|
||||
if (ctx)
|
||||
{
|
||||
for (auto& pending : self->m_state->m_PendingTraffic[addr])
|
||||
{
|
||||
for (auto& pending : self->m_state->m_PendingTraffic[addr])
|
||||
{
|
||||
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
|
||||
}
|
||||
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogWarn("no path made to ", addr);
|
||||
}
|
||||
self->m_state->m_PendingTraffic.erase(addr);
|
||||
},
|
||||
PathAlignmentTimeout());
|
||||
return true;
|
||||
}
|
||||
LogDebug("SendOrQueue failed: no inbound/outbound sessions");
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogWarn("no path made to ", addr);
|
||||
}
|
||||
self->m_state->m_PendingTraffic.erase(addr);
|
||||
},
|
||||
PathAlignmentTimeout());
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
|
|
Loading…
Reference in New Issue