Merge pull request #536 from michael-loki/endpoint_refactor

Refactor service::Endpoint into multiple chunks
This commit is contained in:
Jeff 2019-04-21 15:39:41 -04:00 committed by GitHub
commit 06fcdf9816
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1418 additions and 1185 deletions

View File

@ -214,18 +214,25 @@ set(LIB_SRC
routing/path_latency.cpp
routing/path_transfer.cpp
rpc/rpc.cpp
service/Identity.cpp
service/Intro.cpp
service/IntroSet.cpp
service/address.cpp
service/async_key_exchange.cpp
service/config.cpp
service/context.cpp
service/endpoint.cpp
service/handler.cpp
service/hidden_service_address_lookup.cpp
service/Identity.cpp
service/info.cpp
service/Intro.cpp
service/IntroSet.cpp
service/lookup.cpp
service/outbound_context.cpp
service/pendingbuffer.cpp
service/protocol.cpp
service/sendcontext.cpp
service/session.cpp
service/tag.cpp
service/tag_lookup_job.cpp
service/types.cpp
service/vanity.cpp
utp/inbound_message.cpp

View File

@ -336,9 +336,11 @@ namespace llarp
else
{
dns::Message *replyMsg = new dns::Message(std::move(msg));
using service::Address;
using service::OutboundContext;
return EnsurePathToService(
addr,
[=](const service::Address &remote, OutboundContext *ctx) {
[=](const Address &remote, OutboundContext *ctx) {
SendDNSReply(remote, ctx, replyMsg, reply, false, isV6);
},
2000);

View File

@ -0,0 +1,86 @@
#include <service/async_key_exchange.hpp>
#include <crypto/crypto.hpp>
#include <crypto/types.hpp>
#include <util/logic.hpp>
namespace llarp
{
namespace service
{
AsyncKeyExchange::AsyncKeyExchange(Logic* l, Crypto* c,
const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,
const Introduction& remote,
IDataHandler* h, const ConvoTag& t)
: logic(l)
, crypto(c)
, remote(r)
, m_LocalIdentity(localident)
, introPubKey(introsetPubKey)
, remoteIntro(remote)
, handler(h)
, tag(t)
{
}
void
AsyncKeyExchange::Result(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
// put values
self->handler->PutCachedSessionKeyFor(self->msg.tag, self->sharedKey);
self->handler->PutIntroFor(self->msg.tag, self->remoteIntro);
self->handler->PutSenderFor(self->msg.tag, self->remote);
self->handler->PutReplyIntroFor(self->msg.tag, self->msg.introReply);
self->hook(self->frame);
delete self;
}
void
AsyncKeyExchange::Encrypt(void* user)
{
AsyncKeyExchange* self = static_cast< AsyncKeyExchange* >(user);
// derive ntru session key component
SharedSecret K;
self->crypto->pqe_encrypt(self->frame.C, K, self->introPubKey);
// randomize Nonce
self->frame.N.Randomize();
// compure post handshake session key
// PKE (A, B, N)
SharedSecret sharedSecret;
using namespace std::placeholders;
path_dh_func dh_client =
std::bind(&Crypto::dh_client, self->crypto, _1, _2, _3, _4);
if(!self->m_LocalIdentity.KeyExchange(dh_client, sharedSecret,
self->remote, self->frame.N))
{
LogError("failed to derive x25519 shared key component");
}
std::array< byte_t, 64 > tmp = {{0}};
// K
std::copy(K.begin(), K.end(), tmp.begin());
// H (K + PKE(A, B, N))
std::copy(sharedSecret.begin(), sharedSecret.end(), tmp.begin() + 32);
self->crypto->shorthash(self->sharedKey, llarp_buffer_t(tmp));
// set tag
self->msg.tag = self->tag;
// set sender
self->msg.sender = self->m_LocalIdentity.pub;
// set version
self->msg.version = LLARP_PROTO_VERSION;
// set protocol
self->msg.proto = eProtocolTraffic;
// encrypt and sign
if(self->frame.EncryptAndSign(self->crypto, self->msg, K,
self->m_LocalIdentity))
self->logic->queue_job({self, &Result});
else
{
LogError("failed to encrypt and sign");
delete self;
}
}
} // namespace service
} // namespace llarp

View File

@ -0,0 +1,48 @@
#ifndef LLARP_SERVICE_ASYNC_KEY_EXCHANGE_HPP
#define LLARP_SERVICE_ASYNC_KEY_EXCHANGE_HPP
#include <crypto/types.hpp>
#include <service/Identity.hpp>
#include <service/protocol.hpp>
namespace llarp
{
class Logic;
struct Crypto;
namespace service
{
struct AsyncKeyExchange
{
Logic* logic;
Crypto* crypto;
SharedSecret sharedKey;
ServiceInfo remote;
const Identity& m_LocalIdentity;
ProtocolMessage msg;
ProtocolFrame frame;
Introduction intro;
const PQPubKey introPubKey;
Introduction remoteIntro;
std::function< void(ProtocolFrame&) > hook;
IDataHandler* handler;
ConvoTag tag;
AsyncKeyExchange(Logic* l, Crypto* c, const ServiceInfo& r,
const Identity& localident,
const PQPubKey& introsetPubKey,
const Introduction& remote, IDataHandler* h,
const ConvoTag& t);
static void
Result(void* user);
/// given protocol message make protocol frame
static void
Encrypt(void* user);
};
} // namespace service
} // namespace llarp
#endif

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,13 @@
#include <path/path.hpp>
#include <path/pathbuilder.hpp>
#include <service/address.hpp>
#include <service/Identity.hpp>
#include <service/handler.hpp>
#include <service/Identity.hpp>
#include <service/pendingbuffer.hpp>
#include <service/protocol.hpp>
#include <service/sendcontext.hpp>
#include <service/session.hpp>
#include <service/tag_lookup_job.hpp>
// minimum time between introset shifts
#ifndef MIN_SHIFT_INTERVAL
@ -20,10 +24,9 @@ namespace llarp
{
namespace service
{
// forward declare
struct Context;
// forward declare
struct AsyncKeyExchange;
struct Context;
struct OutboundContext;
struct Endpoint : public path::Builder,
public ILookupHolder,
@ -67,18 +70,18 @@ namespace llarp
}
/// router's logic
llarp::Logic*
Logic*
RouterLogic();
/// endpoint's logic
llarp::Logic*
Logic*
EndpointLogic();
/// borrow endpoint's net loop for sending data to user
llarp_ev_loop_ptr
EndpointNetLoop();
llarp::Crypto*
Crypto*
Crypto();
llarp_threadpool*
@ -115,14 +118,14 @@ namespace llarp
PublishIntroSetVia(AbstractRouter* r, path::Path* p);
bool
HandleGotIntroMessage(const llarp::dht::GotIntroMessage* msg) override;
HandleGotIntroMessage(const dht::GotIntroMessage* msg) override;
bool
HandleGotRouterMessage(const llarp::dht::GotRouterMessage* msg) override;
HandleGotRouterMessage(const dht::GotRouterMessage* msg) override;
bool
HandleHiddenServiceFrame(path::Path* p,
const llarp::service::ProtocolFrame* msg);
const service::ProtocolFrame* msg);
/// return true if we have an established path to a hidden service
bool
@ -188,24 +191,6 @@ namespace llarp
void
FlushSNodeTraffic();
struct PendingBuffer
{
std::vector< byte_t > payload;
ProtocolType protocol;
PendingBuffer(const llarp_buffer_t& buf, ProtocolType t)
: payload(buf.sz), protocol(t)
{
memcpy(payload.data(), buf.base, buf.sz);
}
ManagedBuffer
Buffer()
{
return ManagedBuffer{llarp_buffer_t(payload)};
}
};
bool
HandleDataDrop(path::Path* p, const PathID_t& dst, uint64_t s);
@ -217,54 +202,6 @@ namespace llarp
bool
ShouldBundleRC() const override;
struct SendContext
{
SendContext(const ServiceInfo& ident, const Introduction& intro,
PathSet* send, Endpoint* ep);
void
AsyncEncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);
/// send a fully encrypted hidden service frame
/// via a path on our pathset with path id p
bool
Send(const ProtocolFrame& f);
llarp::SharedSecret sharedKey;
ServiceInfo remoteIdent;
Introduction remoteIntro;
ConvoTag currentConvoTag;
PathSet* m_PathSet;
IDataHandler* m_DataHandler;
Endpoint* m_Endpoint;
uint64_t sequenceNo = 0;
llarp_time_t lastGoodSend = 0;
llarp_time_t createdAt;
llarp_time_t sendTimeout = 40 * 1000;
llarp_time_t connectTimeout = 60 * 1000;
bool markedBad = false;
virtual bool
ShiftIntroduction(bool rebuild = true)
{
(void)rebuild;
return true;
};
virtual void
UpdateIntroSet(bool randomizePath = false) = 0;
virtual bool
MarkCurrentIntroBad(llarp_time_t now) = 0;
private:
void
EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);
virtual void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) = 0;
};
static void
HandlePathDead(void*);
@ -274,107 +211,6 @@ namespace llarp
bool
ShouldBuildMore(llarp_time_t now) const override;
/// context needed to initiate an outbound hidden service session
struct OutboundContext : public path::Builder, public SendContext
{
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext();
util::StatusObject
ExtractStatus() const;
bool
ShouldBundleRC() const override
{
return m_Endpoint->ShouldBundleRC();
}
bool
Stop() override;
bool
HandleDataDrop(path::Path* p, const PathID_t& dst, uint64_t s);
void
HandlePathDied(path::Path* p) override;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool
MarkCurrentIntroBad(llarp_time_t now) override;
/// return true if we are ready to send
bool
ReadyToSend() const;
bool
ShouldBuildMore(llarp_time_t now) const override;
/// tick internal state
/// return true to mark as dead
bool
Tick(llarp_time_t now);
/// return true if it's safe to remove ourselves
bool
IsDone(llarp_time_t now) const;
bool
CheckPathIsDead(path::Path* p, llarp_time_t dlt);
void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) override;
/// issues a lookup to find the current intro set of the remote service
void
UpdateIntroSet(bool randomizePath) override;
bool
BuildOneAlignedTo(const RouterID& remote);
void
HandlePathBuilt(path::Path* path) override;
bool
SelectHop(llarp_nodedb* db, const RouterContact& prev,
RouterContact& cur, size_t hop,
llarp::path::PathRole roles) override;
bool
HandleHiddenServiceFrame(path::Path* p, const ProtocolFrame* frame);
std::string
Name() const override;
private:
/// swap remoteIntro with next intro
void
SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool
OnIntroSetUpdate(const Address& addr, const IntroSet* i,
const RouterID& endpoint);
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
std::unordered_map< Introduction, llarp_time_t, Introduction::Hash >
m_BadIntros;
llarp_time_t lastShift = 0;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
};
// passed a sendto context when we have a path established otherwise
// nullptr if the path was not made before the timeout
using PathEnsureHook = std::function< void(Address, OutboundContext*) >;
@ -386,7 +222,7 @@ namespace llarp
uint64_t timeoutMS, bool lookupOnRandomPath = false);
using SNodeEnsureHook =
std::function< void(RouterID, llarp::exit::BaseSession*) >;
std::function< void(RouterID, exit::BaseSession*) >;
/// ensure a path to a service node by public key
void
@ -432,11 +268,17 @@ namespace llarp
void
PutNewOutboundContext(const IntroSet& introset);
uint64_t
GetSeqNoForConvo(const ConvoTag& tag);
virtual void
IntroSetPublishFail();
virtual void
IntroSetPublished();
uint64_t
GenTXID();
protected:
/// parent context that owns this endpoint
Context* const context;
@ -450,9 +292,6 @@ namespace llarp
void
PrefetchServicesByTag(const Tag& tag);
uint64_t
GetSeqNoForConvo(const ConvoTag& tag);
bool
IsolateNetwork();
@ -487,18 +326,15 @@ namespace llarp
return false;
}
uint64_t
GenTXID();
protected:
IDataHandler* m_DataHandler = nullptr;
Identity m_Identity;
std::unique_ptr< llarp::exit::BaseSession > m_Exit;
std::unique_ptr< exit::BaseSession > m_Exit;
private:
AbstractRouter* m_Router;
llarp_threadpool* m_IsolatedWorker = nullptr;
llarp::Logic* m_IsolatedLogic = nullptr;
Logic* m_IsolatedLogic = nullptr;
llarp_ev_loop_ptr m_IsolatedNetLoop = nullptr;
std::string m_Keyfile;
std::string m_Name;
@ -517,10 +353,8 @@ namespace llarp
Sessions m_DeadSessions;
using SNodeSessions =
std::unordered_multimap< RouterID,
std::unique_ptr< llarp::exit::BaseSession >,
RouterID::Hash >;
using SNodeSessions = std::unordered_multimap<
RouterID, std::unique_ptr< exit::BaseSession >, RouterID::Hash >;
SNodeSessions m_SNodeSessions;
@ -574,104 +408,12 @@ namespace llarp
/// on initialize functions
std::list< std::function< bool(void) > > m_OnInit;
struct Session
{
Introduction replyIntro;
SharedSecret sharedKey;
ServiceInfo remote;
Introduction intro;
llarp_time_t lastUsed = 0;
uint64_t seqno = 0;
util::StatusObject
ExtractStatus() const
{
util::StatusObject obj{{"lastUsed", lastUsed},
{"replyIntro", replyIntro.ExtractStatus()},
{"remote", remote.Addr().ToString()},
{"seqno", seqno},
{"intro", intro.ExtractStatus()}};
return obj;
};
bool
IsExpired(llarp_time_t now,
llarp_time_t lifetime = (path::default_lifetime * 2)) const
{
if(now <= lastUsed)
return false;
return now - lastUsed > lifetime;
}
};
/// conversations
using ConvoMap_t =
std::unordered_map< ConvoTag, Session, ConvoTag::Hash >;
ConvoMap_t m_Sessions;
struct CachedTagResult
{
const static llarp_time_t TTL = 10000;
llarp_time_t lastRequest = 0;
llarp_time_t lastModified = 0;
std::set< IntroSet > result;
Tag tag;
Endpoint* parent;
CachedTagResult(const Tag& t, Endpoint* p) : tag(t), parent(p)
{
}
~CachedTagResult()
{
}
void
Expire(llarp_time_t now);
bool
ShouldRefresh(llarp_time_t now) const
{
if(now <= lastRequest)
return false;
return (now - lastRequest) > TTL;
}
llarp::routing::IMessage*
BuildRequestMessage(uint64_t txid);
bool
HandleResponse(const std::set< IntroSet >& results);
};
struct TagLookupJob : public IServiceLookup
{
TagLookupJob(Endpoint* parent, CachedTagResult* result)
: IServiceLookup(parent, parent->GenTXID(), "taglookup")
, m_result(result)
{
}
~TagLookupJob()
{
}
llarp::routing::IMessage*
BuildRequestMessage()
{
return m_result->BuildRequestMessage(txid);
}
bool
HandleResponse(const std::set< IntroSet >& results)
{
return m_result->HandleResponse(results);
}
CachedTagResult* m_result;
};
std::unordered_map< Tag, CachedTagResult, Tag::Hash > m_PrefetchedTags;
};
} // namespace service

View File

@ -0,0 +1,45 @@
#include <service/hidden_service_address_lookup.hpp>
#include <dht/messages/findintro.hpp>
#include <service/endpoint.hpp>
namespace llarp
{
namespace service
{
HiddenServiceAddressLookup::HiddenServiceAddressLookup(Endpoint* p,
HandlerFunc h,
const Address& addr,
uint64_t tx)
: IServiceLookup(p, tx, "HSLookup"), remote(addr), handle(h)
{
}
bool
HiddenServiceAddressLookup::HandleResponse(
const std::set< IntroSet >& results)
{
LogInfo("found ", results.size(), " for ", remote.ToString());
if(results.size() > 0)
{
IntroSet selected;
for(const auto& introset : results)
{
if(selected.OtherIsNewer(introset) && introset.A.Addr() == remote)
selected = introset;
}
return handle(remote, &selected, endpoint);
}
return handle(remote, nullptr, endpoint);
}
routing::IMessage*
HiddenServiceAddressLookup::BuildRequestMessage()
{
routing::DHTMessage* msg = new routing::DHTMessage();
msg->M.emplace_back(new dht::FindIntroMessage(txid, remote, 0));
return msg;
}
} // namespace service
} // namespace llarp

View File

@ -0,0 +1,36 @@
#ifndef LLARP_SERVICE_HIDDEN_SERVICE_ADDRESS_LOOKUP_HPP
#define LLARP_SERVICE_HIDDEN_SERVICE_ADDRESS_LOOKUP_HPP
#include <messages/dht.hpp>
#include <service/IntroSet.hpp>
#include <service/lookup.hpp>
namespace llarp
{
namespace service
{
struct Endpoint;
struct HiddenServiceAddressLookup : public IServiceLookup
{
Address remote;
using HandlerFunc = std::function< bool(const Address&, const IntroSet*,
const RouterID&) >;
HandlerFunc handle;
HiddenServiceAddressLookup(Endpoint* p, HandlerFunc h,
const Address& addr, uint64_t tx);
~HiddenServiceAddressLookup()
{
}
bool
HandleResponse(const std::set< IntroSet >& results);
routing::IMessage*
BuildRequestMessage();
};
} // namespace service
} // namespace llarp
#endif

View File

@ -1,7 +1,6 @@
#include <service/lookup.hpp>
#include <path/path.hpp>
#include <service/endpoint.hpp>
#include <util/time.hpp>
namespace llarp

View File

@ -0,0 +1,545 @@
#include <service/outbound_context.hpp>
#include <router/abstractrouter.hpp>
#include <service/async_key_exchange.hpp>
#include <service/hidden_service_address_lookup.hpp>
#include <service/endpoint.hpp>
#include <nodedb.hpp>
#include <profiling.hpp>
namespace llarp
{
namespace service
{
bool
OutboundContext::Stop()
{
markedBad = true;
return path::Builder::Stop();
}
bool
OutboundContext::IsDone(llarp_time_t now) const
{
(void)now;
return AvailablePaths(path::ePathRoleAny) == 0 && ShouldRemove();
}
bool
OutboundContext::ShouldBundleRC() const
{
return m_Endpoint->ShouldBundleRC();
}
bool
OutboundContext::HandleDataDrop(path::Path* p, const PathID_t& dst,
uint64_t seq)
{
// pick another intro
if(dst == remoteIntro.pathID && remoteIntro.router == p->Endpoint())
{
LogWarn(Name(), " message ", seq, " dropped by endpoint ",
p->Endpoint(), " via ", dst);
if(MarkCurrentIntroBad(Now()))
{
LogInfo(Name(), " switched intros to ", remoteIntro.router, " via ",
remoteIntro.pathID);
}
UpdateIntroSet(true);
}
return true;
}
OutboundContext::OutboundContext(const IntroSet& introset, Endpoint* parent)
: path::Builder(parent->Router(), parent->Router()->dht(), 3,
path::default_len)
, SendContext(introset.A, {}, this, parent)
, currentIntroSet(introset)
{
updatingIntroSet = false;
for(const auto intro : introset.I)
{
if(intro.expiresAt > m_NextIntro.expiresAt)
m_NextIntro = intro;
}
}
OutboundContext::~OutboundContext()
{
}
/// actually swap intros
void
OutboundContext::SwapIntros()
{
remoteIntro = m_NextIntro;
m_DataHandler->PutIntroFor(currentConvoTag, remoteIntro);
}
bool
OutboundContext::OnIntroSetUpdate(__attribute__((unused))
const Address& addr,
const IntroSet* i,
const RouterID& endpoint)
{
if(markedBad)
return true;
updatingIntroSet = false;
if(i)
{
if(currentIntroSet.T >= i->T)
{
LogInfo("introset is old, dropping");
return true;
}
auto now = Now();
if(i->IsExpired(now))
{
LogError("got expired introset from lookup from ", endpoint);
return true;
}
currentIntroSet = *i;
if(!ShiftIntroduction())
{
LogWarn("failed to pick new intro during introset update");
}
if(GetPathByRouter(m_NextIntro.router) == nullptr)
BuildOneAlignedTo(m_NextIntro.router);
else
SwapIntros();
}
else
++m_LookupFails;
return true;
}
bool
OutboundContext::ReadyToSend() const
{
return (!remoteIntro.router.IsZero())
&& GetPathByRouter(remoteIntro.router) != nullptr;
}
void
OutboundContext::HandlePathBuilt(path::Path* p)
{
path::Builder::HandlePathBuilt(p);
/// don't use it if we are marked bad
if(markedBad)
return;
p->SetDataHandler(std::bind(&OutboundContext::HandleHiddenServiceFrame,
this, std::placeholders::_1,
std::placeholders::_2));
p->SetDropHandler(std::bind(&OutboundContext::HandleDataDrop, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
}
bool
OutboundContext::BuildOneAlignedTo(const RouterID& remote)
{
LogInfo(Name(), " building path to ", remote);
auto nodedb = m_Endpoint->Router()->nodedb();
std::vector< RouterContact > hops;
hops.resize(numHops);
for(size_t hop = 0; hop < numHops; ++hop)
{
if(hop == 0)
{
if(!SelectHop(nodedb, hops[0], hops[0], 0, path::ePathRoleAny))
return false;
}
else if(hop == numHops - 1)
{
// last hop
if(!nodedb->Get(remote, hops[hop]))
return false;
}
// middle hop
else
{
size_t tries = 5;
do
{
nodedb->select_random_hop_excluding(hops[hop],
{hops[hop - 1].pubkey, remote});
--tries;
} while(m_Endpoint->Router()->routerProfiling().IsBadForPath(
hops[hop].pubkey)
&& tries > 0);
return tries > 0;
}
return false;
}
Build(hops);
return true;
}
void
OutboundContext::AsyncGenIntro(const llarp_buffer_t& payload,
__attribute__((unused)) ProtocolType t)
{
auto path = m_PathSet->GetPathByRouter(remoteIntro.router);
if(path == nullptr)
{
// try parent as fallback
path = m_Endpoint->GetPathByRouter(remoteIntro.router);
if(path == nullptr)
{
BuildOneAlignedTo(remoteIntro.router);
LogWarn(Name(), " dropping intro frame, no path to ",
remoteIntro.router);
return;
}
}
currentConvoTag.Randomize();
AsyncKeyExchange* ex = new AsyncKeyExchange(
m_Endpoint->RouterLogic(), m_Endpoint->Crypto(), remoteIdent,
m_Endpoint->GetIdentity(), currentIntroSet.K, remoteIntro,
m_DataHandler, currentConvoTag);
ex->hook = std::bind(&OutboundContext::Send, this, std::placeholders::_1);
ex->msg.PutBuffer(payload);
ex->msg.introReply = path->intro;
ex->frame.F = ex->msg.introReply.pathID;
llarp_threadpool_queue_job(m_Endpoint->Worker(),
{ex, &AsyncKeyExchange::Encrypt});
}
std::string
OutboundContext::Name() const
{
return "OBContext:" + m_Endpoint->Name() + "-"
+ currentIntroSet.A.Addr().ToString();
}
void
OutboundContext::UpdateIntroSet(bool randomizePath)
{
if(updatingIntroSet || markedBad)
return;
auto addr = currentIntroSet.A.Addr();
path::Path* path = nullptr;
if(randomizePath)
path = m_Endpoint->PickRandomEstablishedPath();
else
path = m_Endpoint->GetEstablishedPathClosestTo(addr.as_array());
if(path)
{
HiddenServiceAddressLookup* job = new HiddenServiceAddressLookup(
m_Endpoint,
std::bind(&OutboundContext::OnIntroSetUpdate, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3),
addr, m_Endpoint->GenTXID());
updatingIntroSet = job->SendRequestViaPath(path, m_Endpoint->Router());
}
else
{
LogWarn("Cannot update introset no path for outbound session to ",
currentIntroSet.A.Addr().ToString());
}
}
util::StatusObject
OutboundContext::ExtractStatus() const
{
auto obj = path::Builder::ExtractStatus();
obj.Put("currentConvoTag", currentConvoTag.ToHex());
obj.Put("remoteIntro", remoteIntro.ExtractStatus());
obj.Put("sessionCreatedAt", createdAt);
obj.Put("lastGoodSend", lastGoodSend);
obj.Put("seqno", sequenceNo);
obj.Put("markedBad", markedBad);
obj.Put("lastShift", lastShift);
obj.Put("remoteIdentity", remoteIdent.Addr().ToString());
obj.Put("currentRemoteIntroset", currentIntroSet.ExtractStatus());
obj.Put("nextIntro", m_NextIntro.ExtractStatus());
std::vector< util::StatusObject > badIntrosObj;
std::transform(m_BadIntros.begin(), m_BadIntros.end(),
std::back_inserter(badIntrosObj),
[](const auto& item) -> util::StatusObject {
util::StatusObject o{
{"count", item.second},
{"intro", item.first.ExtractStatus()}};
return o;
});
obj.Put("badIntros", badIntrosObj);
return obj;
}
bool
OutboundContext::Tick(llarp_time_t now)
{
// we are probably dead af
if(m_LookupFails > 16 || m_BuildFails > 10)
return true;
// check for expiration
if(remoteIntro.ExpiresSoon(now))
{
// shift intro if it expires "soon"
ShiftIntroduction();
}
// swap if we can
if(remoteIntro != m_NextIntro)
{
if(GetPathByRouter(m_NextIntro.router) != nullptr)
{
// we can safely set remoteIntro to the next one
SwapIntros();
LogInfo(Name(), " swapped intro");
}
}
// lookup router in intro if set and unknown
if(!remoteIntro.router.IsZero())
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
// expire bad intros
auto itr = m_BadIntros.begin();
while(itr != m_BadIntros.end())
{
if(now - itr->second > path::default_lifetime)
itr = m_BadIntros.erase(itr);
else
++itr;
}
// send control message if we look too quiet
if(lastGoodSend)
{
if(now - lastGoodSend > (sendTimeout / 2))
{
if(!GetNewestPathByRouter(remoteIntro.router))
{
BuildOneAlignedTo(remoteIntro.router);
}
Encrypted< 64 > tmp;
tmp.Randomize();
llarp_buffer_t buf(tmp.data(), tmp.size());
AsyncEncryptAndSendTo(buf, eProtocolControl);
SharedSecret k;
if(currentConvoTag.IsZero())
return false;
return !m_DataHandler->HasConvoTag(currentConvoTag);
}
}
// if we are dead return true so we are removed
return lastGoodSend
? (now >= lastGoodSend && now - lastGoodSend > sendTimeout)
: (now >= createdAt && now - createdAt > connectTimeout);
}
bool
OutboundContext::SelectHop(llarp_nodedb* db, const RouterContact& prev,
RouterContact& cur, size_t hop,
path::PathRole roles)
{
if(remoteIntro.router.IsZero())
{
SwapIntros();
}
if(hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(remoteIntro.router);
if(db->Get(remoteIntro.router, cur))
return true;
++m_BuildFails;
return false;
}
else if(hop == numHops - 2)
{
return db->select_random_hop_excluding(
cur, {prev.pubkey, remoteIntro.router});
}
return path::Builder::SelectHop(db, prev, cur, hop, roles);
}
bool
OutboundContext::ShouldBuildMore(llarp_time_t now) const
{
if(markedBad)
return false;
if(path::Builder::ShouldBuildMore(now))
return true;
return !ReadyToSend();
}
bool
OutboundContext::MarkCurrentIntroBad(llarp_time_t now)
{
// insert bad intro
m_BadIntros[remoteIntro] = now;
// unconditional shift
bool shiftedRouter = false;
bool shiftedIntro = false;
// try same router
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
if(router->routerProfiling().IsBadForPath(intro.router))
continue;
auto itr = m_BadIntros.find(intro);
if(itr == m_BadIntros.end() && intro.router == m_NextIntro.router)
{
shiftedIntro = true;
m_NextIntro = intro;
break;
}
}
if(!shiftedIntro)
{
// try any router
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
auto itr = m_BadIntros.find(intro);
if(itr == m_BadIntros.end())
{
// TODO: this should always be true but idk if it really is
shiftedRouter = m_NextIntro.router != intro.router;
shiftedIntro = true;
m_NextIntro = intro;
break;
}
}
}
if(shiftedRouter)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
}
else if(shiftedIntro)
{
SwapIntros();
}
else
{
LogInfo(Name(), " updating introset");
UpdateIntroSet(true);
}
return shiftedIntro;
}
bool
OutboundContext::ShiftIntroduction(bool rebuild)
{
bool success = false;
auto now = Now();
if(now - lastShift < MIN_SHIFT_INTERVAL)
return false;
bool shifted = false;
// to find a intro on the same router as before
for(const auto& intro : currentIntroSet.I)
{
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end()
&& remoteIntro.router == intro.router)
{
m_NextIntro = intro;
return true;
}
}
for(const auto& intro : currentIntroSet.I)
{
m_Endpoint->EnsureRouterIsKnown(intro.router);
if(intro.ExpiresSoon(now))
continue;
if(m_BadIntros.find(intro) == m_BadIntros.end() && m_NextIntro != intro)
{
shifted = intro.router != m_NextIntro.router
|| (now < intro.expiresAt
&& intro.expiresAt - now
> 10 * 1000); // TODO: hardcoded value
m_NextIntro = intro;
success = true;
break;
}
}
if(shifted && rebuild)
{
lastShift = now;
BuildOneAlignedTo(m_NextIntro.router);
}
return success;
}
void
OutboundContext::HandlePathDied(path::Path* path)
{
// unconditionally update introset
UpdateIntroSet(true);
const RouterID endpoint(path->Endpoint());
// if a path to our current intro died...
if(endpoint == remoteIntro.router)
{
// figure out how many paths to this router we have
size_t num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == endpoint && p->IsReady())
++num;
});
// if we have more than two then we are probably fine
if(num > 2)
return;
// if we have one working one ...
if(num == 1)
{
num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == endpoint)
++num;
});
// if we have 2 or more established or pending don't do anything
if(num > 2)
return;
BuildOneAlignedTo(endpoint);
}
else if(num == 0)
{
// we have no paths to this router right now
// hop off it
Introduction picked;
// get the latest intro that isn't on that endpoint
for(const auto& intro : currentIntroSet.I)
{
if(intro.router == endpoint)
continue;
if(intro.expiresAt > picked.expiresAt)
picked = intro;
}
// we got nothing
if(picked.router.IsZero())
{
return;
}
m_NextIntro = picked;
// check if we have a path to this router
num = 0;
ForEachPath([&](path::Path* p) {
if(p->Endpoint() == m_NextIntro.router)
++num;
});
// build a path if one isn't already pending build or established
if(num == 0)
BuildOneAlignedTo(m_NextIntro.router);
SwapIntros();
}
}
}
bool
OutboundContext::HandleHiddenServiceFrame(path::Path* p,
const ProtocolFrame* frame)
{
return m_Endpoint->HandleHiddenServiceFrame(p, frame);
}
} // namespace service
} // namespace llarp

View File

@ -0,0 +1,116 @@
#ifndef LLARP_SERVICE_OUTBOUND_CONTEXT_HPP
#define LLARP_SERVICE_OUTBOUND_CONTEXT_HPP
#include <path/pathbuilder.hpp>
#include <service/sendcontext.hpp>
#include <util/status.hpp>
#include <unordered_map>
namespace llarp
{
namespace service
{
struct AsyncKeyExchange;
struct Endpoint;
/// context needed to initiate an outbound hidden service session
struct OutboundContext : public path::Builder, public SendContext
{
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext();
util::StatusObject
ExtractStatus() const;
bool
ShouldBundleRC() const override;
bool
Stop() override;
bool
HandleDataDrop(path::Path* p, const PathID_t& dst, uint64_t s);
void
HandlePathDied(path::Path* p) override;
/// set to true if we are updating the remote introset right now
bool updatingIntroSet;
/// update the current selected intro to be a new best introduction
/// return true if we have changed intros
bool
ShiftIntroduction(bool rebuild = true) override;
/// mark the current remote intro as bad
bool
MarkCurrentIntroBad(llarp_time_t now) override;
/// return true if we are ready to send
bool
ReadyToSend() const;
bool
ShouldBuildMore(llarp_time_t now) const override;
/// tick internal state
/// return true to mark as dead
bool
Tick(llarp_time_t now);
/// return true if it's safe to remove ourselves
bool
IsDone(llarp_time_t now) const;
bool
CheckPathIsDead(path::Path* p, llarp_time_t dlt);
void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) override;
/// issues a lookup to find the current intro set of the remote service
void
UpdateIntroSet(bool randomizePath) override;
bool
BuildOneAlignedTo(const RouterID& remote);
void
HandlePathBuilt(path::Path* path) override;
bool
SelectHop(llarp_nodedb* db, const RouterContact& prev, RouterContact& cur,
size_t hop, llarp::path::PathRole roles) override;
bool
HandleHiddenServiceFrame(path::Path* p, const ProtocolFrame* frame);
std::string
Name() const override;
private:
/// swap remoteIntro with next intro
void
SwapIntros();
void
OnGeneratedIntroFrame(AsyncKeyExchange* k, PathID_t p);
bool
OnIntroSetUpdate(const Address& addr, const IntroSet* i,
const RouterID& endpoint);
uint64_t m_UpdateIntrosetTX = 0;
IntroSet currentIntroSet;
Introduction m_NextIntro;
std::unordered_map< Introduction, llarp_time_t, Introduction::Hash >
m_BadIntros;
llarp_time_t lastShift = 0;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
};
} // namespace service
} // namespace llarp
#endif

View File

@ -0,0 +1 @@
#include <service/pendingbuffer.hpp>

View File

@ -0,0 +1,36 @@
#ifndef LLARP_SERVICE_PENDINGBUFFER_HPP
#define LLARP_SERVICE_PENDINGBUFFER_HPP
#include <service/protocol.hpp>
#include <util/buffer.hpp>
#include <algorithm>
#include <iterator>
#include <vector>
namespace llarp
{
namespace service
{
struct PendingBuffer
{
std::vector< byte_t > payload;
ProtocolType protocol;
PendingBuffer(const llarp_buffer_t& buf, ProtocolType t)
: payload(buf.sz), protocol(t)
{
std::copy(buf.base, buf.base + buf.sz, std::back_inserter(payload));
}
ManagedBuffer
Buffer()
{
return ManagedBuffer{llarp_buffer_t(payload)};
}
};
} // namespace service
} // namespace llarp
#endif

View File

@ -0,0 +1,138 @@
#include <service/sendcontext.hpp>
#include <messages/path_transfer.hpp>
#include <service/endpoint.hpp>
#include <router/abstractrouter.hpp>
namespace llarp
{
namespace service
{
SendContext::SendContext(const ServiceInfo& ident,
const Introduction& intro, path::PathSet* send,
Endpoint* ep)
: remoteIdent(ident)
, remoteIntro(intro)
, m_PathSet(send)
, m_DataHandler(ep)
, m_Endpoint(ep)
{
createdAt = ep->Now();
currentConvoTag.Zero();
}
bool
SendContext::Send(const ProtocolFrame& msg)
{
auto path = m_PathSet->GetByEndpointWithID(remoteIntro.router, msg.F);
if(path)
{
const routing::PathTransferMessage transfer(msg, remoteIntro.pathID);
if(path->SendRoutingMessage(&transfer, m_Endpoint->Router()))
{
llarp::LogInfo("sent intro to ", remoteIntro.pathID, " on ",
remoteIntro.router, " seqno=", sequenceNo);
lastGoodSend = m_Endpoint->Now();
++sequenceNo;
return true;
}
else
llarp::LogError("Failed to send frame on path");
}
else
llarp::LogError("cannot send because we have no path to ",
remoteIntro.router);
return false;
}
/// send on an established convo tag
void
SendContext::EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t)
{
auto crypto = m_Endpoint->Router()->crypto();
SharedSecret shared;
routing::PathTransferMessage msg;
ProtocolFrame& f = msg.T;
f.N.Randomize();
f.T = currentConvoTag;
f.S = m_Endpoint->GetSeqNoForConvo(f.T);
auto now = m_Endpoint->Now();
if(remoteIntro.ExpiresSoon(now))
{
// shift intro
if(MarkCurrentIntroBad(now))
{
llarp::LogInfo("intro shifted");
}
}
auto path = m_PathSet->GetNewestPathByRouter(remoteIntro.router);
if(!path)
{
llarp::LogError("cannot encrypt and send: no path for intro ",
remoteIntro);
return;
}
if(m_DataHandler->GetCachedSessionKeyFor(f.T, shared))
{
ProtocolMessage m;
m_DataHandler->PutIntroFor(f.T, remoteIntro);
m_DataHandler->PutReplyIntroFor(f.T, path->intro);
m.proto = t;
m.introReply = path->intro;
f.F = m.introReply.pathID;
m.sender = m_Endpoint->GetIdentity().pub;
m.tag = f.T;
m.PutBuffer(payload);
if(!f.EncryptAndSign(crypto, m, shared, m_Endpoint->GetIdentity()))
{
llarp::LogError("failed to sign");
return;
}
}
else
{
llarp::LogError("No cached session key");
return;
}
msg.P = remoteIntro.pathID;
msg.Y.Randomize();
if(path->SendRoutingMessage(&msg, m_Endpoint->Router()))
{
llarp::LogDebug("sent message via ", remoteIntro.pathID, " on ",
remoteIntro.router);
++sequenceNo;
lastGoodSend = now;
}
else
{
llarp::LogWarn("Failed to send routing message for data");
}
}
void
SendContext::AsyncEncryptAndSendTo(const llarp_buffer_t& data,
ProtocolType protocol)
{
auto now = m_Endpoint->Now();
if(remoteIntro.ExpiresSoon(now))
{
if(!MarkCurrentIntroBad(now))
{
llarp::LogWarn("no good path yet, your message may drop");
}
}
if(sequenceNo)
{
EncryptAndSendTo(data, protocol);
}
else
{
AsyncGenIntro(data, protocol);
}
}
} // namespace service
} // namespace llarp

View File

@ -0,0 +1,68 @@
#ifndef LLARP_SERVICE_SENDCONTEXT_HPP
#define LLARP_SERVICE_SENDCONTEXT_HPP
#include <path/pathset.hpp>
#include <service/Intro.hpp>
#include <service/protocol.hpp>
#include <util/buffer.hpp>
#include <util/types.hpp>
namespace llarp
{
namespace service
{
struct ServiceInfo;
struct Endpoint;
struct Introduction;
struct SendContext
{
SendContext(const ServiceInfo& ident, const Introduction& intro,
path::PathSet* send, Endpoint* ep);
void
AsyncEncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);
/// send a fully encrypted hidden service frame
/// via a path on our pathset with path id p
bool
Send(const ProtocolFrame& f);
SharedSecret sharedKey;
ServiceInfo remoteIdent;
Introduction remoteIntro;
ConvoTag currentConvoTag;
path::PathSet* m_PathSet;
IDataHandler* m_DataHandler;
Endpoint* m_Endpoint;
uint64_t sequenceNo = 0;
llarp_time_t lastGoodSend = 0;
llarp_time_t createdAt;
llarp_time_t sendTimeout = 40 * 1000;
llarp_time_t connectTimeout = 60 * 1000;
bool markedBad = false;
virtual bool
ShiftIntroduction(bool rebuild = true)
{
(void)rebuild;
return true;
};
virtual void
UpdateIntroSet(bool randomizePath = false) = 0;
virtual bool
MarkCurrentIntroBad(llarp_time_t now) = 0;
private:
void
EncryptAndSendTo(const llarp_buffer_t& payload, ProtocolType t);
virtual void
AsyncGenIntro(const llarp_buffer_t& payload, ProtocolType t) = 0;
};
} // namespace service
} // namespace llarp
#endif

18
llarp/service/session.cpp Normal file
View File

@ -0,0 +1,18 @@
#include <service/session.hpp>
namespace llarp
{
namespace service
{
util::StatusObject
Session::ExtractStatus() const
{
util::StatusObject obj{{"lastUsed", lastUsed},
{"replyIntro", replyIntro.ExtractStatus()},
{"remote", remote.Addr().ToString()},
{"seqno", seqno},
{"intro", intro.ExtractStatus()}};
return obj;
};
} // namespace service
} // namespace llarp

41
llarp/service/session.hpp Normal file
View File

@ -0,0 +1,41 @@
#ifndef LLARP_SERVICE_SESSION_HPP
#define LLARP_SERVICE_SESSION_HPP
#include <crypto/types.hpp>
#include <path/path.hpp>
#include <service/Info.hpp>
#include <service/Intro.hpp>
#include <util/status.hpp>
#include <util/types.hpp>
namespace llarp
{
namespace service
{
struct Session
{
Introduction replyIntro;
SharedSecret sharedKey;
ServiceInfo remote;
Introduction intro;
llarp_time_t lastUsed = 0;
uint64_t seqno = 0;
util::StatusObject
ExtractStatus() const;
bool
IsExpired(llarp_time_t now,
llarp_time_t lifetime = (path::default_lifetime * 2)) const
{
if(now <= lastUsed)
return false;
return now - lastUsed > lifetime;
}
};
} // namespace service
} // namespace llarp
#endif

View File

@ -0,0 +1,61 @@
#include <service/tag_lookup_job.hpp>
#include <dht/messages/findintro.hpp>
#include <messages/dht.hpp>
#include <service/endpoint.hpp>
namespace llarp
{
namespace service
{
bool
CachedTagResult::HandleResponse(const std::set< IntroSet >& introsets)
{
auto now = parent->Now();
for(const auto& introset : introsets)
if(result.insert(introset).second)
lastModified = now;
LogInfo("Tag result for ", tag.ToString(), " got ", introsets.size(),
" results from lookup, have ", result.size(),
" cached last modified at ", lastModified, " is ",
now - lastModified, "ms old");
return true;
}
void
CachedTagResult::Expire(llarp_time_t now)
{
auto itr = result.begin();
while(itr != result.end())
{
if(itr->HasExpiredIntros(now))
{
LogInfo("Removing expired tag Entry ", itr->A.Name());
itr = result.erase(itr);
lastModified = now;
}
else
{
++itr;
}
}
}
routing::IMessage*
CachedTagResult::BuildRequestMessage(uint64_t txid)
{
routing::DHTMessage* msg = new routing::DHTMessage();
msg->M.emplace_back(new dht::FindIntroMessage(tag, txid));
lastRequest = parent->Now();
return msg;
}
TagLookupJob::TagLookupJob(Endpoint* parent, CachedTagResult* result)
: IServiceLookup(parent, parent->GenTXID(), "taglookup")
, m_result(result)
{
}
} // namespace service
} // namespace llarp

View File

@ -0,0 +1,79 @@
#ifndef LLARP_SERVICE_TAG_LOOKUP_JOB_HPP
#define LLARP_SERVICE_TAG_LOOKUP_JOB_HPP
#include <routing/message.hpp>
#include <service/IntroSet.hpp>
#include <service/lookup.hpp>
#include <service/tag.hpp>
#include <util/types.hpp>
#include <set>
namespace llarp
{
namespace service
{
struct Endpoint;
struct CachedTagResult
{
const static llarp_time_t TTL = 10000;
llarp_time_t lastRequest = 0;
llarp_time_t lastModified = 0;
std::set< IntroSet > result;
Tag tag;
Endpoint* parent;
CachedTagResult(const Tag& t, Endpoint* p) : tag(t), parent(p)
{
}
~CachedTagResult()
{
}
void
Expire(llarp_time_t now);
bool
ShouldRefresh(llarp_time_t now) const
{
if(now <= lastRequest)
return false;
return (now - lastRequest) > TTL;
}
llarp::routing::IMessage*
BuildRequestMessage(uint64_t txid);
bool
HandleResponse(const std::set< IntroSet >& results);
};
struct TagLookupJob : public IServiceLookup
{
TagLookupJob(Endpoint* parent, CachedTagResult* result);
~TagLookupJob()
{
}
llarp::routing::IMessage*
BuildRequestMessage() override
{
return m_result->BuildRequestMessage(txid);
}
bool
HandleResponse(const std::set< IntroSet >& results) override
{
return m_result->HandleResponse(results);
}
CachedTagResult* m_result;
};
} // namespace service
} // namespace llarp
#endif