1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00
lokinet/llarp/iwp/session.cpp

1024 lines
29 KiB
C++
Raw Normal View History

#include "session.hpp"
2019-09-01 14:38:03 +02:00
#include <llarp/messages/link_intro.hpp>
#include <llarp/messages/discard.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/router/abstractrouter.hpp>
2019-08-22 22:53:27 +02:00
#include <queue>
2019-08-22 22:53:27 +02:00
namespace llarp
{
namespace iwp
{
2019-09-12 16:34:27 +02:00
ILinkSession::Packet_t
CreatePacket(Command cmd, size_t plainsize, size_t minpad, size_t variance)
{
const size_t pad = minpad > 0 ? minpad + (variance > 0 ? randint() % variance : 0) : 0;
ILinkSession::Packet_t pkt(PacketOverhead + plainsize + pad + CommandOverhead);
2019-09-12 16:34:27 +02:00
// randomize pad
if (pad)
2019-09-12 16:34:27 +02:00
{
CryptoManager::instance()->randbytes(
pkt.data() + PacketOverhead + CommandOverhead + plainsize, pad);
2019-09-12 16:34:27 +02:00
}
// randomize nounce
CryptoManager::instance()->randbytes(pkt.data() + HMACSIZE, TUNNONCESIZE);
2022-05-26 17:59:44 +02:00
pkt[PacketOverhead] = llarp::constants::proto_version;
2019-09-12 16:34:27 +02:00
pkt[PacketOverhead + 1] = cmd;
return pkt;
}
constexpr size_t PlaintextQueueSize = 512;
2021-02-22 14:26:32 +01:00
Session::Session(LinkLayer* p, const RouterContact& rc, const AddressInfo& ai)
2019-08-22 22:53:27 +02:00
: m_State{State::Initial}
, m_Inbound{false}
2019-11-20 17:10:52 +01:00
, m_Parent(p)
2019-08-22 22:53:27 +02:00
, m_CreatedAt{p->Now()}
, m_RemoteAddr{ai}
2019-11-20 17:10:52 +01:00
, m_ChosenAI(ai)
, m_RemoteRC(rc)
2021-02-22 14:26:32 +01:00
, m_PlaintextRecv{PlaintextQueueSize}
2019-08-22 22:53:27 +02:00
{
token.Zero();
m_PlaintextEmpty.test_and_set();
2019-08-22 22:53:27 +02:00
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(rc.pubkey));
2019-08-22 22:53:27 +02:00
}
Session::Session(LinkLayer* p, const SockAddr& from)
2019-08-22 22:53:27 +02:00
: m_State{State::Initial}
, m_Inbound{true}
2019-11-20 17:10:52 +01:00
, m_Parent(p)
2019-08-22 22:53:27 +02:00
, m_CreatedAt{p->Now()}
, m_RemoteAddr{from}
2021-02-22 14:26:32 +01:00
, m_PlaintextRecv{PlaintextQueueSize}
2019-08-22 22:53:27 +02:00
{
token.Randomize();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotInboundLIM, this);
const PubKey pk = m_Parent->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(pk));
2019-08-22 22:53:27 +02:00
}
void
2019-11-19 21:30:51 +01:00
Session::Send_LL(const byte_t* buf, size_t sz)
2019-08-22 22:53:27 +02:00
{
LogTrace("send ", sz, " to ", m_RemoteAddr);
2019-11-19 21:30:51 +01:00
const llarp_buffer_t pkt(buf, sz);
m_Parent->SendTo_LL(m_RemoteAddr, pkt);
2019-08-22 22:53:27 +02:00
m_LastTX = time_now_ms();
2019-12-17 15:36:56 +01:00
m_TXRate += sz;
2019-08-22 22:53:27 +02:00
}
bool
2019-08-23 13:32:52 +02:00
Session::GotInboundLIM(const LinkIntroMessage* msg)
2019-08-22 22:53:27 +02:00
{
if (msg->rc.pubkey != m_ExpectedIdent)
2019-08-23 13:32:52 +02:00
{
LogError(
2020-05-15 15:07:28 +02:00
"ident key mismatch from ", m_RemoteAddr, " ", msg->rc.pubkey, " != ", m_ExpectedIdent);
2019-08-22 22:53:27 +02:00
return false;
2019-08-23 13:32:52 +02:00
}
m_State = State::Ready;
GotLIM = util::memFn(&Session::GotRenegLIM, this);
2019-08-23 13:32:52 +02:00
m_RemoteRC = msg->rc;
m_Parent->MapAddr(m_RemoteRC.pubkey, this);
2020-06-11 21:02:34 +02:00
return m_Parent->SessionEstablished(this, true);
2019-08-22 22:53:27 +02:00
}
bool
2019-08-23 13:32:52 +02:00
Session::GotOutboundLIM(const LinkIntroMessage* msg)
2019-08-22 22:53:27 +02:00
{
if (msg->rc.pubkey != m_RemoteRC.pubkey)
2019-08-23 13:32:52 +02:00
{
LogError("ident key mismatch");
2019-08-22 22:53:27 +02:00
return false;
2019-08-23 13:32:52 +02:00
}
2021-07-01 14:30:29 +02:00
2019-08-23 13:32:52 +02:00
m_RemoteRC = msg->rc;
GotLIM = util::memFn(&Session::GotRenegLIM, this);
assert(shared_from_this().use_count() > 1);
SendOurLIM([self = shared_from_this()](ILinkSession::DeliveryStatus st) {
if (st == ILinkSession::DeliveryStatus::eDeliverySuccess)
2019-08-23 13:32:52 +02:00
{
self->m_State = State::Ready;
self->m_Parent->MapAddr(self->m_RemoteRC.pubkey, self.get());
2020-06-11 21:02:34 +02:00
self->m_Parent->SessionEstablished(self.get(), false);
2019-08-23 13:32:52 +02:00
}
});
2019-08-22 22:53:27 +02:00
return true;
}
void
2019-08-23 13:32:52 +02:00
Session::SendOurLIM(ILinkSession::CompletionHandler h)
2019-08-22 22:53:27 +02:00
{
LinkIntroMessage msg;
msg.rc = m_Parent->GetOurRC();
msg.N.Randomize();
msg.P = 60000;
if (not msg.Sign(m_Parent->Sign))
2019-08-22 22:53:27 +02:00
{
LogError("failed to sign our RC for ", m_RemoteAddr);
return;
}
ILinkSession::Message_t data(LinkIntroMessage::MaxSize + PacketOverhead);
llarp_buffer_t buf(data);
if (not msg.BEncode(&buf))
2019-08-22 22:53:27 +02:00
{
LogError("failed to encode LIM for ", m_RemoteAddr);
return;
2019-08-22 22:53:27 +02:00
}
if (not SendMessageBuffer(std::move(data), h))
2019-08-22 22:53:27 +02:00
{
LogError("failed to send LIM to ", m_RemoteAddr);
return;
2019-08-22 22:53:27 +02:00
}
LogTrace("sent LIM to ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
}
void
2019-09-12 16:34:27 +02:00
Session::EncryptAndSend(ILinkSession::Packet_t data)
2019-08-22 22:53:27 +02:00
{
2021-02-22 14:26:32 +01:00
m_EncryptNext.emplace_back(std::move(data));
TriggerPump();
if (!IsEstablished())
{
2019-09-05 16:57:01 +02:00
EncryptWorker(std::move(m_EncryptNext));
2021-02-22 14:26:32 +01:00
m_EncryptNext = CryptoQueue_t{};
}
2019-09-05 16:57:01 +02:00
}
void
2021-02-22 14:26:32 +01:00
Session::EncryptWorker(CryptoQueue_t msgs)
2019-09-05 16:57:01 +02:00
{
LogTrace("encrypt worker ", msgs.size(), " messages");
2021-02-22 14:26:32 +01:00
for (auto& pkt : msgs)
2019-09-05 16:57:01 +02:00
{
2021-02-22 14:26:32 +01:00
llarp_buffer_t pktbuf{pkt};
const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE};
2019-09-05 16:57:01 +02:00
pktbuf.base += PacketOverhead;
pktbuf.cur = pktbuf.base;
pktbuf.sz -= PacketOverhead;
CryptoManager::instance()->xchacha20(pktbuf, m_SessionKey, nonce_ptr);
pktbuf.base = pkt.data() + HMACSIZE;
pktbuf.sz = pkt.size() - HMACSIZE;
2019-09-05 16:57:01 +02:00
CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey);
2019-11-19 21:30:51 +01:00
Send_LL(pkt.data(), pkt.size());
2019-09-05 16:57:01 +02:00
}
2019-08-22 22:53:27 +02:00
}
void
Session::Close()
{
if (m_State == State::Closed)
2019-08-22 22:53:27 +02:00
return;
2019-09-12 16:34:27 +02:00
auto close_msg = CreatePacket(Command::eCLOS, 0, 16, 16);
m_Parent->UnmapAddr(m_RemoteAddr);
2019-08-22 22:53:27 +02:00
m_State = State::Closed;
if (m_SentClosed.test_and_set())
return;
2019-12-05 17:31:58 +01:00
EncryptAndSend(std::move(close_msg));
2021-07-01 14:30:29 +02:00
LogInfo(m_Parent->PrintableName(), " closing connection to ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
}
bool
Session::SendMessageBuffer(
ILinkSession::Message_t buf, ILinkSession::CompletionHandler completed, uint16_t priority)
2019-08-22 22:53:27 +02:00
{
if (m_TXMsgs.size() >= MaxSendQueueSize)
2021-04-12 17:59:11 +02:00
{
if (completed)
2021-04-12 17:59:11 +02:00
completed(ILinkSession::DeliveryStatus::eDeliveryDropped);
return false;
2021-04-12 17:59:11 +02:00
}
const auto now = m_Parent->Now();
2019-08-22 22:53:27 +02:00
const auto msgid = m_TXID++;
2020-03-23 18:20:32 +01:00
const auto bufsz = buf.size();
auto& msg =
m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed, priority})
.first->second;
TriggerPump();
EncryptAndSend(msg.XMIT());
if (bufsz > FragmentSize)
2019-11-30 03:57:40 +01:00
{
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
2020-02-07 19:43:40 +01:00
m_Stats.totalInFlightTX++;
LogDebug("send message ", msgid, " to ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return true;
}
void
Session::SendMACK()
{
// send multi acks
while (not m_SendMACKs.empty())
{
const auto sz = m_SendMACKs.size();
const auto max = Session::MaxACKSInMACK;
auto numAcks = std::min(sz, max);
auto mack = CreatePacket(Command::eMACK, 1 + (numAcks * sizeof(uint64_t)));
mack[PacketOverhead + CommandOverhead] = byte_t{static_cast<byte_t>(numAcks)};
byte_t* ptr = mack.data() + 3 + PacketOverhead;
LogTrace("send ", numAcks, " macks to ", m_RemoteAddr);
const auto& itr = m_SendMACKs.top();
while (numAcks > 0)
{
oxenc::write_host_as_big(itr, ptr);
m_SendMACKs.pop();
numAcks--;
ptr += sizeof(uint64_t);
}
2019-09-12 16:34:27 +02:00
EncryptAndSend(std::move(mack));
}
}
void
Session::TriggerPump()
{
m_Parent->Router()->TriggerPump();
}
2019-08-22 22:53:27 +02:00
void
Session::Pump()
{
const auto now = m_Parent->Now();
if (m_State == State::Ready || m_State == State::LinkIntro)
2019-08-22 22:53:27 +02:00
{
if (ShouldPing())
2019-08-23 13:57:57 +02:00
SendKeepAlive();
for (auto& [id, msg] : m_RXMsgs)
2019-08-22 22:53:27 +02:00
{
if (msg.ShouldSendACKS(now))
2019-08-22 22:53:27 +02:00
{
msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
2019-08-22 22:53:27 +02:00
}
}
std::priority_queue<
OutboundMessage*,
std::vector<OutboundMessage*>,
ComparePtr<OutboundMessage*>>
2022-04-05 00:12:27 +02:00
to_resend;
for (auto& [id, msg] : m_TXMsgs)
2019-08-22 22:53:27 +02:00
{
if (msg.ShouldFlush(now))
2022-04-05 00:12:27 +02:00
to_resend.push(&msg);
}
2022-04-05 00:12:27 +02:00
if (not to_resend.empty())
{
2022-04-05 00:12:27 +02:00
for (auto& msg = to_resend.top(); not to_resend.empty(); to_resend.pop())
msg->FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
2019-08-22 22:53:27 +02:00
}
}
2021-02-22 14:26:32 +01:00
if (not m_EncryptNext.empty())
{
m_Parent->QueueWork(
[self = shared_from_this(), data = m_EncryptNext] { self->EncryptWorker(data); });
2021-02-22 14:26:32 +01:00
m_EncryptNext.clear();
}
2019-09-05 16:57:01 +02:00
2021-02-22 14:26:32 +01:00
if (not m_DecryptNext.empty())
{
m_Parent->QueueWork(
[self = shared_from_this(), data = m_DecryptNext] { self->DecryptWorker(data); });
2021-02-22 14:26:32 +01:00
m_DecryptNext.clear();
}
2019-08-22 22:53:27 +02:00
}
2019-08-23 13:32:52 +02:00
bool
Session::GotRenegLIM(const LinkIntroMessage* lim)
2019-08-22 22:53:27 +02:00
{
2019-08-23 14:30:07 +02:00
LogDebug("renegotiate session on ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return m_Parent->SessionRenegotiate(lim->rc, m_RemoteRC);
}
2019-08-23 13:32:52 +02:00
bool
2019-08-22 22:53:27 +02:00
Session::RenegotiateSession()
{
SendOurLIM();
return true;
}
bool
Session::ShouldPing() const
{
if (m_State == State::Ready)
2019-08-23 13:32:52 +02:00
{
const auto now = m_Parent->Now();
2019-08-23 13:32:52 +02:00
return now - m_LastTX > PingInterval;
}
return false;
2019-08-22 22:53:27 +02:00
}
2020-06-04 18:00:30 +02:00
SessionStats
Session::GetSessionStats() const
{
// TODO: thread safety
return m_Stats;
}
2019-08-22 22:53:27 +02:00
util::StatusObject
Session::ExtractStatus() const
{
2020-02-07 19:43:40 +01:00
const auto now = m_Parent->Now();
2021-03-05 18:31:52 +01:00
return {
{"txRateCurrent", m_Stats.currentRateTX},
{"rxRateCurrent", m_Stats.currentRateRX},
{"rxPktsRcvd", m_Stats.totalPacketsRX},
// leave 'tx' and 'rx' as duplicates of 'xRateCurrent' for compat
{"tx", m_Stats.currentRateTX},
{"rx", m_Stats.currentRateRX},
{"txPktsAcked", m_Stats.totalAckedTX},
{"txPktsDropped", m_Stats.totalDroppedTX},
{"txPktsInFlight", m_Stats.totalInFlightTX},
{"state", StateToString(m_State)},
{"inbound", m_Inbound},
{"replayFilter", m_ReplayFilter.size()},
{"txMsgQueueSize", m_TXMsgs.size()},
{"rxMsgQueueSize", m_RXMsgs.size()},
{"remoteAddr", m_RemoteAddr.ToString()},
2021-03-05 18:31:52 +01:00
{"remoteRC", m_RemoteRC.ExtractStatus()},
{"created", to_json(m_CreatedAt)},
{"uptime", to_json(now - m_CreatedAt)}};
2019-08-22 22:53:27 +02:00
}
bool
Session::TimedOut(llarp_time_t now) const
{
if (m_State == State::Ready)
2019-08-23 13:32:52 +02:00
{
return now > m_LastRX
&& now - m_LastRX
> (m_Inbound and not m_RemoteRC.IsPublicRouter() ? DefaultLinkSessionLifetime
: SessionAliveTimeout);
2019-08-23 13:32:52 +02:00
}
return now - m_CreatedAt >= LinkLayerConnectTimeout;
2019-08-22 22:53:27 +02:00
}
2019-12-17 15:36:56 +01:00
bool
Session::ShouldResetRates(llarp_time_t now) const
{
return now >= m_ResetRatesAt;
}
void
Session::ResetRates()
{
2020-02-07 19:43:40 +01:00
m_Stats.currentRateTX = m_TXRate;
m_Stats.currentRateRX = m_RXRate;
m_RXRate = 0;
m_TXRate = 0;
2019-12-17 15:36:56 +01:00
}
void
Session::Tick(llarp_time_t now)
2019-08-22 22:53:27 +02:00
{
if (ShouldResetRates(now))
2019-12-17 15:36:56 +01:00
{
ResetRates();
2020-02-24 20:40:45 +01:00
m_ResetRatesAt = now + 1s;
2019-12-17 15:36:56 +01:00
}
// remove pending outbound messsages that timed out
// inform waiters
{
2019-08-27 14:13:55 +02:00
auto itr = m_TXMsgs.begin();
while (itr != m_TXMsgs.end())
{
if (itr->second.IsTimedOut(now))
2019-08-27 14:13:55 +02:00
{
2020-02-07 19:43:40 +01:00
m_Stats.totalDroppedTX++;
m_Stats.totalInFlightTX--;
LogTrace("Dropped unacked packet to ", m_RemoteAddr);
2019-08-27 14:13:55 +02:00
itr->second.InformTimeout();
itr = m_TXMsgs.erase(itr);
}
else
++itr;
}
}
{
// remove pending inbound messages that timed out
auto itr = m_RXMsgs.begin();
while (itr != m_RXMsgs.end())
2019-08-27 14:13:55 +02:00
{
if (itr->second.IsTimedOut(now))
2019-08-27 14:13:55 +02:00
{
m_ReplayFilter.emplace(itr->first, now);
2019-08-27 14:13:55 +02:00
itr = m_RXMsgs.erase(itr);
}
else
++itr;
}
}
2019-08-28 13:11:03 +02:00
{
// decay replay window
auto itr = m_ReplayFilter.begin();
while (itr != m_ReplayFilter.end())
2019-08-28 13:11:03 +02:00
{
if (itr->second + ReplayWindow <= now)
2019-08-28 13:11:03 +02:00
{
itr = m_ReplayFilter.erase(itr);
}
else
++itr;
}
}
2019-08-22 22:53:27 +02:00
}
using Introduction =
AlignedBuffer<PubKey::SIZE + PubKey::SIZE + TunnelNonce::SIZE + Signature::SIZE>;
2019-08-22 22:53:27 +02:00
2019-08-23 13:32:52 +02:00
void
2019-08-22 22:53:27 +02:00
Session::GenerateAndSendIntro()
{
TunnelNonce N;
N.Randomize();
2019-11-20 17:10:52 +01:00
{
ILinkSession::Packet_t req(Introduction::SIZE + PacketOverhead);
const auto pk = m_Parent->GetOurRC().pubkey;
2019-11-20 17:10:52 +01:00
const auto e_pk = m_Parent->RouterEncryptionSecret().toPublic();
auto itr = req.data() + PacketOverhead;
2019-11-20 17:10:52 +01:00
std::copy_n(pk.data(), pk.size(), itr);
itr += pk.size();
std::copy_n(e_pk.data(), e_pk.size(), itr);
itr += e_pk.size();
std::copy_n(N.data(), N.size(), itr);
Signature Z;
llarp_buffer_t signbuf(req.data() + PacketOverhead, Introduction::SIZE - Signature::SIZE);
2019-11-20 17:10:52 +01:00
m_Parent->Sign(Z, signbuf);
std::copy_n(
Z.data(),
Z.size(),
req.data() + PacketOverhead + (Introduction::SIZE - Signature::SIZE));
CryptoManager::instance()->randbytes(req.data() + HMACSIZE, TUNNONCESIZE);
2019-11-20 17:10:52 +01:00
EncryptAndSend(std::move(req));
}
2019-08-22 22:53:27 +02:00
m_State = State::Introduction;
if (not CryptoManager::instance()->transport_dh_client(
m_SessionKey, m_ChosenAI.pubkey, m_Parent->RouterEncryptionSecret(), N))
2019-09-05 16:57:01 +02:00
{
LogError("failed to transport_dh_client on outbound session to ", m_RemoteAddr);
2019-09-05 16:57:01 +02:00
return;
}
LogTrace("sent intro to ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
}
2019-08-23 13:32:52 +02:00
2019-08-22 22:53:27 +02:00
void
2019-09-12 16:34:27 +02:00
Session::HandleCreateSessionRequest(Packet_t pkt)
2019-08-22 22:53:27 +02:00
{
if (not DecryptMessageInPlace(pkt))
2019-08-22 22:53:27 +02:00
{
2021-07-01 14:30:29 +02:00
LogError(
m_Parent->PrintableName(), " failed to decrypt session request from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
if (pkt.size() < token.size() + PacketOverhead)
2019-08-22 22:53:27 +02:00
{
LogError(
2021-07-01 14:30:29 +02:00
m_Parent->PrintableName(),
" bad session request size, ",
pkt.size(),
" < ",
token.size() + PacketOverhead,
" from ",
m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
2019-10-02 15:06:14 +02:00
const auto begin = pkt.data() + PacketOverhead;
if (not std::equal(begin, begin + token.size(), token.data()))
2019-08-22 22:53:27 +02:00
{
2021-07-01 14:30:29 +02:00
LogError(m_Parent->PrintableName(), " token mismatch from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
2019-08-23 13:32:52 +02:00
m_LastRX = m_Parent->Now();
m_State = State::LinkIntro;
2019-08-22 22:53:27 +02:00
SendOurLIM();
}
2019-08-23 13:32:52 +02:00
void
2019-09-12 16:34:27 +02:00
Session::HandleGotIntro(Packet_t pkt)
2019-08-22 22:53:27 +02:00
{
if (pkt.size() < (Introduction::SIZE + PacketOverhead))
2019-08-23 13:32:52 +02:00
{
2021-07-01 14:30:29 +02:00
LogWarn(m_Parent->PrintableName(), " intro too small from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
2019-08-23 13:32:52 +02:00
}
byte_t* ptr = pkt.data() + PacketOverhead;
2019-08-22 22:53:27 +02:00
TunnelNonce N;
2019-10-28 22:39:24 +01:00
std::copy_n(ptr, PubKey::SIZE, m_ExpectedIdent.data());
ptr += PubKey::SIZE;
2019-10-28 22:39:24 +01:00
std::copy_n(ptr, PubKey::SIZE, m_RemoteOnionKey.data());
ptr += PubKey::SIZE;
2019-10-28 22:39:24 +01:00
std::copy_n(ptr, TunnelNonce::SIZE, N.data());
ptr += TunnelNonce::SIZE;
Signature Z;
2019-10-28 22:46:39 +01:00
std::copy_n(ptr, Z.size(), Z.data());
const llarp_buffer_t verifybuf(
pkt.data() + PacketOverhead, Introduction::SIZE - Signature::SIZE);
if (!CryptoManager::instance()->verify(m_ExpectedIdent, verifybuf, Z))
{
2021-07-01 14:30:29 +02:00
LogError(m_Parent->PrintableName(), " intro verify failed from ", m_RemoteAddr);
return;
}
2019-08-22 22:53:27 +02:00
const PubKey pk = m_Parent->TransportSecretKey().toPublic();
LogDebug(
"got intro: remote-pk=",
m_RemoteOnionKey.ToHex(),
" N=",
N.ToHex(),
" local-pk=",
pk.ToHex());
if (not CryptoManager::instance()->transport_dh_server(
m_SessionKey, m_RemoteOnionKey, m_Parent->TransportSecretKey(), N))
{
LogError("failed to transport_dh_server on inbound intro from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
2019-10-02 15:06:14 +02:00
Packet_t reply(token.size() + PacketOverhead);
2019-09-12 16:34:27 +02:00
// random nonce
CryptoManager::instance()->randbytes(reply.data() + HMACSIZE, TUNNONCESIZE);
2019-09-12 16:34:27 +02:00
// set token
2019-10-28 22:39:24 +01:00
std::copy_n(token.data(), token.size(), reply.data() + PacketOverhead);
2019-08-22 22:53:27 +02:00
m_LastRX = m_Parent->Now();
2019-09-12 16:34:27 +02:00
EncryptAndSend(std::move(reply));
2019-08-23 13:32:52 +02:00
LogDebug("sent intro ack to ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
m_State = State::Introduction;
}
void
2019-09-12 16:34:27 +02:00
Session::HandleGotIntroAck(Packet_t pkt)
2019-08-22 22:53:27 +02:00
{
if (pkt.size() < (token.size() + PacketOverhead))
2019-08-22 22:53:27 +02:00
{
LogError(
2021-07-01 14:30:29 +02:00
m_Parent->PrintableName(),
" bad intro ack size ",
pkt.size(),
" < ",
token.size() + PacketOverhead,
" from ",
m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
2019-10-02 15:06:14 +02:00
Packet_t reply(token.size() + PacketOverhead);
if (not DecryptMessageInPlace(pkt))
2019-08-22 22:53:27 +02:00
{
2021-07-01 14:30:29 +02:00
LogError(m_Parent->PrintableName(), " intro ack decrypt failed from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
m_LastRX = m_Parent->Now();
2019-10-28 22:39:24 +01:00
std::copy_n(pkt.data() + PacketOverhead, token.size(), token.data());
std::copy_n(token.data(), token.size(), reply.data() + PacketOverhead);
2019-09-12 16:34:27 +02:00
// random nounce
CryptoManager::instance()->randbytes(reply.data() + HMACSIZE, TUNNONCESIZE);
2019-09-12 16:34:27 +02:00
EncryptAndSend(std::move(reply));
2019-08-23 13:32:52 +02:00
LogDebug("sent session request to ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
m_State = State::LinkIntro;
}
bool
2019-09-12 16:34:27 +02:00
Session::DecryptMessageInPlace(Packet_t& pkt)
2019-08-22 22:53:27 +02:00
{
if (pkt.size() <= PacketOverhead)
2019-08-23 13:32:52 +02:00
{
2019-09-12 16:34:27 +02:00
LogError("packet too small from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return false;
2019-08-23 13:32:52 +02:00
}
2019-09-12 16:34:27 +02:00
const llarp_buffer_t buf(pkt);
2019-08-22 22:53:27 +02:00
ShortHash H;
llarp_buffer_t curbuf(buf.base, buf.sz);
2019-08-22 22:53:27 +02:00
curbuf.base += ShortHash::SIZE;
curbuf.sz -= ShortHash::SIZE;
if (not CryptoManager::instance()->hmac(H.data(), curbuf, m_SessionKey))
2019-08-22 22:53:27 +02:00
{
LogError("failed to caclulate keyed hash for ", m_RemoteAddr);
return false;
}
const ShortHash expected{buf.base};
if (H != expected)
{
2021-11-28 15:56:53 +01:00
LogDebug(
2021-07-01 14:30:29 +02:00
m_Parent->PrintableName(),
" keyed hash mismatch ",
H,
" != ",
expected,
" from ",
m_RemoteAddr,
" state=",
int(m_State),
" size=",
buf.sz);
2019-08-22 22:53:27 +02:00
return false;
}
const TunnelNonce N{curbuf.base};
2019-08-22 22:53:27 +02:00
curbuf.base += 32;
curbuf.sz -= 32;
LogTrace("decrypt: ", curbuf.sz, " bytes from ", m_RemoteAddr);
2019-09-12 16:34:27 +02:00
return CryptoManager::instance()->xchacha20(curbuf, m_SessionKey, N);
2019-08-22 22:53:27 +02:00
}
void
Session::Start()
{
if (m_Inbound)
2019-08-22 22:53:27 +02:00
return;
GenerateAndSendIntro();
}
void
2019-09-12 16:34:27 +02:00
Session::HandleSessionData(Packet_t pkt)
2019-08-22 22:53:27 +02:00
{
2021-02-22 14:26:32 +01:00
m_DecryptNext.emplace_back(std::move(pkt));
TriggerPump();
2019-09-05 16:57:01 +02:00
}
void
2021-02-22 14:26:32 +01:00
Session::DecryptWorker(CryptoQueue_t msgs)
2019-09-05 16:57:01 +02:00
{
2021-02-22 14:26:32 +01:00
auto itr = msgs.begin();
while (itr != msgs.end())
2019-08-22 22:53:27 +02:00
{
2021-02-22 14:26:32 +01:00
auto& pkt = *itr;
if (not DecryptMessageInPlace(pkt))
2019-09-05 16:57:01 +02:00
{
2021-02-22 14:26:32 +01:00
itr = msgs.erase(itr);
2019-09-05 16:57:01 +02:00
LogError("failed to decrypt session data from ", m_RemoteAddr);
continue;
}
2022-05-26 17:59:44 +02:00
if (pkt[PacketOverhead] != llarp::constants::proto_version)
2019-09-05 16:57:01 +02:00
{
LogError(
2022-05-26 17:59:44 +02:00
"protocol version mismatch ",
int(pkt[PacketOverhead]),
" != ",
llarp::constants::proto_version);
2021-02-22 14:26:32 +01:00
itr = msgs.erase(itr);
2019-09-05 16:57:01 +02:00
continue;
}
2021-02-22 14:26:32 +01:00
++itr;
2019-08-22 22:53:27 +02:00
}
2021-02-22 21:29:12 +01:00
m_PlaintextRecv.tryPushBack(std::move(msgs));
m_PlaintextEmpty.clear();
2021-02-22 14:26:32 +01:00
m_Parent->WakeupPlaintext();
2019-09-05 16:57:01 +02:00
}
void
2021-02-22 14:26:32 +01:00
Session::HandlePlaintext()
2019-09-05 16:57:01 +02:00
{
if (m_PlaintextEmpty.test_and_set())
return;
2021-11-11 18:41:36 +01:00
while (auto maybe_queue = m_PlaintextRecv.tryPopFront())
2019-08-22 22:53:27 +02:00
{
2021-11-11 18:41:36 +01:00
for (auto& result : *maybe_queue)
2019-09-05 16:57:01 +02:00
{
LogTrace("Command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr);
2021-02-22 14:26:32 +01:00
switch (result[PacketOverhead + 1])
{
case Command::eXMIT:
HandleXMIT(std::move(result));
break;
case Command::eDATA:
HandleDATA(std::move(result));
break;
case Command::eACKS:
HandleACKS(std::move(result));
break;
case Command::ePING:
HandlePING(std::move(result));
break;
case Command::eNACK:
HandleNACK(std::move(result));
break;
case Command::eCLOS:
HandleCLOS(std::move(result));
break;
case Command::eMACK:
HandleMACK(std::move(result));
break;
default:
LogError("invalid command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr);
}
2019-09-05 16:57:01 +02:00
}
2019-08-22 22:53:27 +02:00
}
SendMACK();
m_Parent->WakeupPlaintext();
2019-08-22 22:53:27 +02:00
}
void
2019-10-02 15:06:14 +02:00
Session::HandleMACK(Packet_t data)
{
if (data.size() < (3 + PacketOverhead))
{
LogError("impossibly short mack from ", m_RemoteAddr);
return;
}
byte_t numAcks = data[CommandOverhead + PacketOverhead];
if (data.size() < 1 + CommandOverhead + PacketOverhead + (numAcks * sizeof(uint64_t)))
{
LogError("short mack from ", m_RemoteAddr);
return;
}
LogTrace("got ", int(numAcks), " mack from ", m_RemoteAddr);
byte_t* ptr = data.data() + CommandOverhead + PacketOverhead + 1;
while (numAcks > 0)
{
auto acked = oxenc::load_big_to_host<uint64_t>(ptr);
LogTrace("mack containing txid=", acked, " from ", m_RemoteAddr);
auto itr = m_TXMsgs.find(acked);
if (itr != m_TXMsgs.end())
{
2020-02-07 19:43:40 +01:00
m_Stats.totalAckedTX++;
m_Stats.totalInFlightTX--;
itr->second.Completed();
m_TXMsgs.erase(itr);
}
else
{
LogTrace("ignored mack for txid=", acked, " from ", m_RemoteAddr);
}
ptr += sizeof(uint64_t);
numAcks--;
}
}
2019-08-22 22:53:27 +02:00
void
2019-10-02 15:06:14 +02:00
Session::HandleNACK(Packet_t data)
2019-08-23 13:32:52 +02:00
{
if (data.size() < (CommandOverhead + sizeof(uint64_t) + PacketOverhead))
{
LogError("short nack from ", m_RemoteAddr);
return;
}
auto txid = oxenc::load_big_to_host<uint64_t>(data.data() + CommandOverhead + PacketOverhead);
LogTrace("got nack on ", txid, " from ", m_RemoteAddr);
2019-08-23 13:32:52 +02:00
auto itr = m_TXMsgs.find(txid);
if (itr != m_TXMsgs.end())
2019-08-23 13:32:52 +02:00
{
EncryptAndSend(itr->second.XMIT());
2019-08-23 13:32:52 +02:00
}
2019-08-23 13:36:11 +02:00
m_LastRX = m_Parent->Now();
2019-08-23 13:32:52 +02:00
}
void
2019-10-02 15:06:14 +02:00
Session::HandleXMIT(Packet_t data)
2019-08-22 22:53:27 +02:00
{
2020-03-31 15:26:39 +02:00
static constexpr size_t XMITOverhead =
(CommandOverhead + PacketOverhead + sizeof(uint16_t) + sizeof(uint64_t)
+ ShortHash::SIZE);
if (data.size() < XMITOverhead)
2019-08-22 22:53:27 +02:00
{
LogError("short XMIT from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
auto* pos = data.data() + CommandOverhead + PacketOverhead;
auto sz = oxenc::load_big_to_host<uint16_t>(pos);
pos += sizeof(sz);
auto rxid = oxenc::load_big_to_host<uint64_t>(pos);
pos += sizeof(rxid);
auto p2 = pos + ShortHash::SIZE;
assert(p2 == data.data() + XMITOverhead);
LogTrace("rxid=", rxid, " sz=", sz, " h=", oxenc::to_hex(pos, p2), " from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
m_LastRX = m_Parent->Now();
{
// check for replay
auto itr = m_ReplayFilter.find(rxid);
if (itr != m_ReplayFilter.end())
{
m_SendMACKs.emplace(rxid);
LogTrace("duplicate rxid=", rxid, " from ", m_RemoteAddr);
return;
}
}
{
2019-11-30 03:12:11 +01:00
const auto now = m_Parent->Now();
auto itr = m_RXMsgs.find(rxid);
if (itr == m_RXMsgs.end())
2019-11-30 03:08:13 +01:00
{
itr = m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, ShortHash{pos}, m_Parent->Now()})
.first;
TriggerPump();
2020-03-23 18:38:14 +01:00
sz = std::min(sz, uint16_t{FragmentSize});
if ((data.size() - XMITOverhead) == sz)
2019-11-30 03:08:13 +01:00
{
{
2019-11-30 03:57:40 +01:00
const llarp_buffer_t buf(data.data() + (data.size() - sz), sz);
itr->second.HandleData(0, buf, now);
if (not itr->second.IsCompleted())
{
return;
}
2020-05-21 16:26:51 +02:00
if (not itr->second.Verify())
2019-11-30 03:57:40 +01:00
{
LogError("bad short xmit hash from ", m_RemoteAddr);
return;
}
2019-11-30 03:08:13 +01:00
}
HandleRecvMsgCompleted(itr->second);
2019-11-30 03:08:13 +01:00
}
}
else
LogTrace("got duplicate xmit on ", rxid, " from ", m_RemoteAddr);
}
2019-08-22 22:53:27 +02:00
}
void
2019-10-02 15:06:14 +02:00
Session::HandleDATA(Packet_t data)
2019-08-22 22:53:27 +02:00
{
if (data.size() < (CommandOverhead + sizeof(uint16_t) + sizeof(uint64_t) + PacketOverhead))
2019-08-22 22:53:27 +02:00
{
2019-08-23 13:32:52 +02:00
LogError("short DATA from ", m_RemoteAddr, " ", data.size());
2019-08-22 22:53:27 +02:00
return;
}
m_LastRX = m_Parent->Now();
auto sz = oxenc::load_big_to_host<uint16_t>(data.data() + CommandOverhead + PacketOverhead);
auto rxid = oxenc::load_big_to_host<uint64_t>(
data.data() + CommandOverhead + sizeof(uint16_t) + PacketOverhead);
auto itr = m_RXMsgs.find(rxid);
if (itr == m_RXMsgs.end())
2019-08-22 22:53:27 +02:00
{
if (m_ReplayFilter.find(rxid) == m_ReplayFilter.end())
{
LogTrace("no rxid=", rxid, " for ", m_RemoteAddr);
auto nack = CreatePacket(Command::eNACK, 8);
oxenc::write_host_as_big(rxid, nack.data() + PacketOverhead + CommandOverhead);
EncryptAndSend(std::move(nack));
}
else
{
LogTrace("replay hit for rxid=", rxid, " for ", m_RemoteAddr);
m_SendMACKs.emplace(rxid);
}
2019-08-22 22:53:27 +02:00
return;
}
{
const llarp_buffer_t buf(
data.data() + PacketOverhead + 12, data.size() - (PacketOverhead + 12));
itr->second.HandleData(sz, buf, m_Parent->Now());
}
2019-08-23 13:32:52 +02:00
if (itr->second.IsCompleted())
2019-08-23 13:32:52 +02:00
{
if (itr->second.Verify())
2019-08-23 13:32:52 +02:00
{
HandleRecvMsgCompleted(itr->second);
2019-08-23 13:32:52 +02:00
}
else
{
LogError("hash mismatch for message ", itr->first);
2019-08-23 13:32:52 +02:00
}
}
2019-08-22 22:53:27 +02:00
}
void
Session::HandleRecvMsgCompleted(const InboundMessage& msg)
{
const auto rxid = msg.m_MsgID;
if (m_ReplayFilter.emplace(rxid, m_Parent->Now()).second)
{
2021-02-14 14:26:19 +01:00
m_Parent->HandleMessage(this, msg.m_Data);
2021-03-04 21:25:11 +01:00
EncryptAndSend(msg.ACKS());
LogDebug("recv'd message ", rxid, " from ", m_RemoteAddr);
}
m_RXMsgs.erase(rxid);
}
2019-08-23 13:32:52 +02:00
void
2019-10-02 15:06:14 +02:00
Session::HandleACKS(Packet_t data)
2019-08-22 22:53:27 +02:00
{
if (data.size() < (11 + PacketOverhead))
2019-08-22 22:53:27 +02:00
{
LogError("short ACKS from ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
const auto now = m_Parent->Now();
m_LastRX = now;
auto txid = oxenc::load_big_to_host<uint64_t>(data.data() + 2 + PacketOverhead);
auto itr = m_TXMsgs.find(txid);
if (itr == m_TXMsgs.end())
2019-08-22 22:53:27 +02:00
{
LogTrace("no txid=", txid, " for ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
return;
}
itr->second.Ack(data[10 + PacketOverhead]);
2019-08-23 13:32:52 +02:00
if (itr->second.IsTransmitted())
2019-08-23 13:32:52 +02:00
{
LogDebug("sent message ", itr->first, " to ", m_RemoteAddr);
2019-08-23 13:32:52 +02:00
itr->second.Completed();
itr = m_TXMsgs.erase(itr);
}
else
{
itr->second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
2019-08-22 22:53:27 +02:00
}
2022-10-21 00:23:14 +02:00
void
Session::HandleCLOS(Packet_t)
2019-08-22 22:53:27 +02:00
{
2019-08-23 13:32:52 +02:00
LogInfo("remote closed by ", m_RemoteAddr);
2019-08-22 22:53:27 +02:00
Close();
}
2022-10-21 00:23:14 +02:00
void
Session::HandlePING(Packet_t)
2019-08-22 22:53:27 +02:00
{
m_LastRX = m_Parent->Now();
}
bool
Session::SendKeepAlive()
{
if (m_State == State::Ready)
2019-08-23 13:32:52 +02:00
{
2019-10-02 15:06:14 +02:00
EncryptAndSend(CreatePacket(Command::ePING, 0));
2019-08-23 13:32:52 +02:00
return true;
}
2019-08-22 22:53:27 +02:00
return false;
}
bool
Session::IsEstablished() const
{
return m_State == State::Ready;
}
bool
Session::Recv_LL(ILinkSession::Packet_t data)
2019-08-22 22:53:27 +02:00
{
2019-12-17 15:36:56 +01:00
m_RXRate += data.size();
2020-02-07 19:43:40 +01:00
// TODO: differentiate between good and bad RX packets here
m_Stats.totalPacketsRX++;
switch (m_State)
2019-08-22 22:53:27 +02:00
{
case State::Initial:
if (m_Inbound)
2019-08-22 22:53:27 +02:00
{
// initial data
// enter introduction phase
if (DecryptMessageInPlace(data))
{
HandleGotIntro(std::move(data));
}
else
{
2021-11-28 15:56:53 +01:00
LogDebug("bad intro from ", m_RemoteAddr);
return false;
}
2019-08-22 22:53:27 +02:00
}
break;
case State::Introduction:
if (m_Inbound)
2019-08-22 22:53:27 +02:00
{
// we are replying to an intro ack
HandleCreateSessionRequest(std::move(data));
2019-08-22 22:53:27 +02:00
}
else
{
// we got an intro ack
// send a session request
HandleGotIntroAck(std::move(data));
2019-08-22 22:53:27 +02:00
}
break;
case State::LinkIntro:
default:
HandleSessionData(std::move(data));
2019-08-22 22:53:27 +02:00
break;
}
return true;
2019-08-22 22:53:27 +02:00
}
2020-02-07 19:43:40 +01:00
std::string
Session::StateToString(State state)
2020-02-07 19:43:40 +01:00
{
switch (state)
2020-02-07 19:43:40 +01:00
{
case State::Initial:
return "Initial";
case State::Introduction:
return "Introduction";
case State::LinkIntro:
return "LinkIntro";
case State::Ready:
return "Ready";
case State::Closed:
return "Close";
default:
return "Invalid";
}
}
2019-08-22 22:53:27 +02:00
} // namespace iwp
2019-09-01 14:38:03 +02:00
} // namespace llarp