1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00
lokinet/llarp/service/sendcontext.cpp

126 lines
3.5 KiB
C++
Raw Normal View History

2019-04-19 18:02:32 +02:00
#include <service/sendcontext.hpp>
#include <router/abstractrouter.hpp>
2019-06-20 00:30:07 +02:00
#include <routing/path_transfer_message.hpp>
2019-06-15 16:55:14 +02:00
#include <service/endpoint.hpp>
2019-09-01 15:26:16 +02:00
#include <util/thread/logic.hpp>
2019-07-31 01:42:13 +02:00
#include <utility>
#include <unordered_set>
2019-04-19 18:02:32 +02:00
namespace llarp
{
namespace service
{
2020-01-06 22:08:31 +01:00
static constexpr size_t SendContextQueueSize = 512;
SendContext::SendContext(
ServiceInfo ident, const Introduction& intro, path::PathSet* send, Endpoint* ep)
2019-07-31 01:42:13 +02:00
: remoteIdent(std::move(ident))
2019-04-19 18:02:32 +02:00
, remoteIntro(intro)
, m_PathSet(send)
, m_DataHandler(ep)
, m_Endpoint(ep)
2020-02-24 20:40:45 +01:00
, createdAt(ep->Now())
2020-01-06 22:08:31 +01:00
, m_SendQueue(SendContextQueueSize)
2019-04-19 18:02:32 +02:00
{
}
bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
2019-04-19 18:02:32 +02:00
{
if (m_SendQueue.empty() or m_SendQueue.full())
2020-01-06 22:08:31 +01:00
{
LogicCall(m_Endpoint->RouterLogic(), [self = this]() { self->FlushUpstream(); });
2020-01-06 22:08:31 +01:00
}
m_SendQueue.pushBack(std::make_pair(
std::make_shared<const routing::PathTransferMessage>(*msg, remoteIntro.pathID), path));
2019-04-25 19:15:56 +02:00
return true;
}
void
SendContext::FlushUpstream()
2019-04-25 19:15:56 +02:00
{
auto r = m_Endpoint->Router();
std::unordered_set<path::Path_ptr, path::Path::Ptr_Hash> flushpaths;
2019-04-25 19:15:56 +02:00
{
2020-01-06 22:08:31 +01:00
do
2019-04-19 18:02:32 +02:00
{
2020-01-06 22:08:31 +01:00
auto maybe = m_SendQueue.tryPopFront();
if (not maybe)
2020-01-06 22:08:31 +01:00
break;
auto& item = *maybe;
if (item.second->SendRoutingMessage(*item.first, r))
{
lastGoodSend = r->Now();
flushpaths.emplace(item.second);
m_Endpoint->MarkConvoTagActive(item.first->T.T);
}
} while (not m_SendQueue.empty());
}
// flush the select path's upstream
for (const auto& path : flushpaths)
{
path->FlushUpstream(r);
2019-04-25 19:15:56 +02:00
}
2019-04-19 18:02:32 +02:00
}
/// send on an established convo tag
void
SendContext::EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t)
{
SharedSecret shared;
auto f = std::make_shared<ProtocolFrame>();
f->N.Randomize();
f->T = currentConvoTag;
f->S = ++sequenceNo;
2019-04-19 18:02:32 +02:00
auto path = m_PathSet->GetRandomPathByRouter(remoteIntro.router);
if (!path)
2019-04-19 18:02:32 +02:00
{
LogError(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);
2019-04-19 18:02:32 +02:00
return;
}
if (!m_DataHandler->GetCachedSessionKeyFor(f->T, shared))
2019-04-19 18:02:32 +02:00
{
LogError(m_Endpoint->Name(), " has no cached session key on session T=", f->T);
2019-04-19 18:02:32 +02:00
return;
}
2019-05-22 18:20:50 +02:00
auto m = std::make_shared<ProtocolMessage>();
m_DataHandler->PutIntroFor(f->T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f->T, path->intro);
m->proto = t;
m->seqno = m_Endpoint->GetSeqNoForConvo(f->T);
m->introReply = path->intro;
f->F = m->introReply.pathID;
m->sender = m_Endpoint->GetIdentity().pub;
m->tag = f->T;
m->PutBuffer(payload);
auto self = this;
m_Endpoint->Router()->QueueWork([f, m, shared, path, self]() {
if (not f->EncryptAndSign(*m, shared, self->m_Endpoint->GetIdentity()))
{
LogError(self->m_Endpoint->Name(), " failed to sign message");
return;
}
2019-11-29 06:03:26 +01:00
self->Send(f, path);
});
2019-04-19 18:02:32 +02:00
}
void
SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data, ProtocolType protocol)
2019-04-19 18:02:32 +02:00
{
if (lastGoodSend != 0s)
2019-04-19 18:02:32 +02:00
{
EncryptAndSendTo(data, protocol);
}
else
{
AsyncGenIntro(data, protocol);
}
}
} // namespace service
} // namespace llarp