refactor dht code to be split up a bit more

This commit is contained in:
Jeff Becker 2018-07-11 09:20:14 -04:00
parent cf6275155a
commit 39e100d0b2
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
33 changed files with 1557 additions and 1332 deletions

View File

@ -172,6 +172,14 @@ set(LIB_SRC
llarp/api/client.cpp
llarp/api/message.cpp
llarp/api/parser.cpp
llarp/dht/context.cpp
llarp/dht/decode.cpp
llarp/dht/dht_immediate.cpp
llarp/dht/find_router.cpp
llarp/dht/got_intro.cpp
llarp/dht/got_router.cpp
llarp/dht/search_job.cpp
llarp/dht/publish_intro.cpp
llarp/iwp/frame_header.cpp
llarp/iwp/frame_state.cpp
llarp/iwp/session.cpp

View File

@ -1,443 +1,5 @@
#ifndef LLARP_DHT_HPP_
#define LLARP_DHT_HPP_
#include <llarp/buffer.h>
#include <llarp/dht.h>
#include <llarp/router.h>
#include <llarp/router_contact.h>
#include <llarp/time.h>
#include <llarp/aligned.hpp>
#include <llarp/path_types.hpp>
#include <llarp/service.hpp>
#include <array>
#include <functional>
#include <map>
#include <set>
#include <unordered_map>
#include <vector>
namespace llarp
{
namespace dht
{
const size_t MAX_MSG_SIZE = 2048;
struct Key_t : public llarp::AlignedBuffer< 32 >
{
Key_t(const byte_t* val) : llarp::AlignedBuffer< 32 >(val)
{
}
Key_t() : llarp::AlignedBuffer< 32 >()
{
}
Key_t
operator^(const Key_t& other) const
{
Key_t dist;
for(size_t idx = 0; idx < 4; ++idx)
dist.l[idx] = l[idx] ^ other.l[idx];
return dist;
}
bool
operator<(const Key_t& other) const
{
return memcmp(data_l(), other.data_l(), 32) < 0;
}
};
struct RCNode
{
llarp_rc* rc;
Key_t ID;
RCNode() : rc(nullptr)
{
ID.Zero();
}
RCNode(llarp_rc* other) : rc(other)
{
ID = other->pubkey;
}
};
struct ISNode
{
llarp::service::IntroSet introset;
Key_t ID;
ISNode()
{
ID.Zero();
}
ISNode(const llarp::service::IntroSet& other)
{
introset = other;
other.A.CalculateAddress(ID);
}
};
struct SearchJob
{
const static uint64_t JobTimeout = 30000;
SearchJob();
SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, llarp_router_lookup_job* job,
const std::set< Key_t >& excludes);
void
Completed(const llarp_rc* router, bool timeout = false) const;
bool
IsExpired(llarp_time_t now) const;
llarp_router_lookup_job* job = nullptr;
llarp_time_t started;
Key_t requester;
uint64_t requesterTX;
Key_t target;
std::set< Key_t > exclude;
};
struct XorMetric
{
const Key_t& us;
XorMetric(const Key_t& ourKey) : us(ourKey){};
bool
operator()(const Key_t& left, const Key_t& right) const
{
return (us ^ left) < (us ^ right);
};
};
struct IMessage : public llarp::IBEncodeMessage
{
virtual ~IMessage(){};
/// construct
IMessage(const Key_t& from) : From(from)
{
}
virtual bool
HandleMessage(llarp_dht_context* dht,
std::vector< IMessage* >& replies) const = 0;
Key_t From;
PathID_t pathID;
};
IMessage*
DecodeMessage(const Key_t& from, llarp_buffer_t* buf, bool relayed = false);
bool
DecodeMesssageList(const Key_t& from, llarp_buffer_t* buf,
std::vector< IMessage* >& dst, bool relayed = false);
template < typename Val_t >
struct Bucket
{
typedef std::map< Key_t, Val_t, XorMetric > BucketStorage_t;
Bucket(const Key_t& us) : nodes(XorMetric(us)){};
bool
FindClosest(const Key_t& target, Key_t& result) const
{
Key_t mindist;
mindist.Fill(0xff);
for(const auto& item : nodes)
{
auto curDist = item.first ^ target;
if(curDist < mindist)
{
mindist = curDist;
result = item.first;
}
}
return nodes.size() > 0;
}
bool
FindCloseExcluding(const Key_t& target, Key_t& result,
const std::set< Key_t >& exclude) const
{
Key_t maxdist;
maxdist.Fill(0xff);
Key_t mindist;
mindist.Fill(0xff);
for(const auto& item : nodes)
{
if(exclude.find(item.first) != exclude.end())
continue;
auto curDist = item.first ^ target;
if(curDist < mindist)
{
mindist = curDist;
result = item.first;
}
}
return mindist < maxdist;
}
void
PutNode(const Val_t& val)
{
nodes[val.ID] = val;
}
void
DelNode(const Key_t& key)
{
auto itr = nodes.find(key);
if(itr != nodes.end())
nodes.erase(itr);
}
BucketStorage_t nodes;
};
struct Context
{
Context();
~Context();
llarp_dht_msg_handler custom_handler = nullptr;
SearchJob*
FindPendingTX(const Key_t& owner, uint64_t txid);
void
RemovePendingLookup(const Key_t& owner, uint64_t txid);
void
LookupRouter(const Key_t& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
llarp_router_lookup_job* job = nullptr,
bool iterative = false, std::set< Key_t > excludes = {});
void
LookupRouterViaJob(llarp_router_lookup_job* job);
void
LookupRouterRelayed(const Key_t& requester, uint64_t txid,
const Key_t& target, bool recursive,
std::vector< IMessage* >& replies);
bool
RelayRequestForPath(const llarp::PathID_t& localPath,
const IMessage* msg);
void
Init(const Key_t& us, llarp_router* router);
void
QueueRouterLookup(llarp_router_lookup_job* job);
static void
handle_cleaner_timer(void* user, uint64_t orig, uint64_t left);
static void
queue_router_lookup(void* user);
llarp_router* router = nullptr;
// for router contacts
Bucket< RCNode >* nodes = nullptr;
// for introduction sets
Bucket< ISNode >* services = nullptr;
bool allowTransit = false;
const Key_t&
OurKey() const
{
return ourKey;
}
private:
void
ScheduleCleanupTimer();
void
CleanupTX();
uint64_t ids;
struct TXOwner
{
Key_t node;
uint64_t txid = 0;
bool
operator==(const TXOwner& other) const
{
return txid == other.txid && node == other.node;
}
bool
operator<(const TXOwner& other) const
{
return txid < other.txid || node < other.node;
}
};
struct TXOwnerHash
{
std::size_t
operator()(TXOwner const& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.node[0], sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
};
std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX;
Key_t ourKey;
};
struct FindIntroMessage : public IMessage
{
uint64_t R = 0;
llarp::service::Address S;
uint64_t T = 0;
~FindIntroMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
/// acknologement to PublishIntroMessage or reply to FinIntroMessage
struct GotIntroMessage : public IMessage
{
std::list< llarp::service::IntroSet > I;
uint64_t T;
GotIntroMessage(uint64_t tx, const llarp::service::IntroSet* i = nullptr);
~GotIntroMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
struct PublishIntroMessage : public IMessage
{
llarp::service::IntroSet I;
uint64_t R = 0;
uint64_t S = 0;
uint64_t txID = 0;
bool hasS = false;
PublishIntroMessage() : IMessage({})
{
}
~PublishIntroMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
struct GotRouterMessage : public IMessage
{
GotRouterMessage(const Key_t& from) : IMessage(from)
{
}
GotRouterMessage(const Key_t& from, uint64_t id, const llarp_rc* result)
: IMessage(from), txid(id)
{
if(result)
{
R.emplace_back();
llarp_rc_clear(&R.back());
llarp_rc_copy(&R.back(), result);
}
}
~GotRouterMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
std::vector< llarp_rc > R;
uint64_t txid = 0;
uint64_t version = 0;
};
struct FindRouterMessage : public IMessage
{
FindRouterMessage(const Key_t& from) : IMessage(from)
{
}
FindRouterMessage(const Key_t& from, const Key_t& target, uint64_t id)
: IMessage(from), K(target), txid(id)
{
}
~FindRouterMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
Key_t K;
bool iterative = false;
uint64_t txid = 0;
uint64_t version = 0;
};
} // namespace dht
} // namespace llarp
struct llarp_dht_context
{
llarp::dht::Context impl;
llarp_router* parent;
llarp_dht_context(llarp_router* router);
};
#endif
#include <llarp/dht/context.hpp>
#include <llarp/dht/key.hpp>
#include <llarp/dht/message.hpp>
#include <llarp/dht/messages/all.hpp>
#include <llarp/dht/node.hpp>

View File

@ -0,0 +1,77 @@
#ifndef LLARP_DHT_BUCKET_HPP
#define LLARP_DHT_BUCKET_HPP
#include <llarp/dht/kademlia.hpp>
#include <llarp/dht/key.hpp>
#include <map>
#include <set>
namespace llarp
{
namespace dht
{
template < typename Val_t >
struct Bucket
{
typedef std::map< Key_t, Val_t, XorMetric > BucketStorage_t;
Bucket(const Key_t& us) : nodes(XorMetric(us)){};
bool
FindClosest(const Key_t& target, Key_t& result) const
{
Key_t mindist;
mindist.Fill(0xff);
for(const auto& item : nodes)
{
auto curDist = item.first ^ target;
if(curDist < mindist)
{
mindist = curDist;
result = item.first;
}
}
return nodes.size() > 0;
}
bool
FindCloseExcluding(const Key_t& target, Key_t& result,
const std::set< Key_t >& exclude) const
{
Key_t maxdist;
maxdist.Fill(0xff);
Key_t mindist;
mindist.Fill(0xff);
for(const auto& item : nodes)
{
if(exclude.find(item.first) != exclude.end())
continue;
auto curDist = item.first ^ target;
if(curDist < mindist)
{
mindist = curDist;
result = item.first;
}
}
return mindist < maxdist;
}
void
PutNode(const Val_t& val)
{
nodes[val.ID] = val;
}
void
DelNode(const Key_t& key)
{
auto itr = nodes.find(key);
if(itr != nodes.end())
nodes.erase(itr);
}
BucketStorage_t nodes;
};
}
}
#endif

View File

@ -0,0 +1,125 @@
#ifndef LLARP_DHT_CONTEXT_HPP
#define LLARP_DHT_CONTEXT_HPP
#include <llarp/dht.h>
#include <llarp/router.h>
#include <llarp/dht/bucket.hpp>
#include <llarp/dht/key.hpp>
#include <llarp/dht/message.hpp>
#include <llarp/dht/node.hpp>
#include <llarp/dht/search_job.hpp>
#include <set>
namespace llarp
{
namespace dht
{
struct Context
{
Context();
~Context();
llarp_dht_msg_handler custom_handler = nullptr;
SearchJob*
FindPendingTX(const Key_t& owner, uint64_t txid);
void
RemovePendingLookup(const Key_t& owner, uint64_t txid);
void
LookupRouter(const Key_t& target, const Key_t& whoasked,
uint64_t whoaskedTX, const Key_t& askpeer,
llarp_router_lookup_job* job = nullptr,
bool iterative = false, std::set< Key_t > excludes = {});
void
LookupRouterViaJob(llarp_router_lookup_job* job);
void
LookupRouterRelayed(const Key_t& requester, uint64_t txid,
const Key_t& target, bool recursive,
std::vector< IMessage* >& replies);
bool
RelayRequestForPath(const llarp::PathID_t& localPath,
const IMessage* msg);
void
Init(const Key_t& us, llarp_router* router);
void
QueueRouterLookup(llarp_router_lookup_job* job);
static void
handle_cleaner_timer(void* user, uint64_t orig, uint64_t left);
static void
queue_router_lookup(void* user);
llarp_router* router = nullptr;
// for router contacts
Bucket< RCNode >* nodes = nullptr;
// for introduction sets
Bucket< ISNode >* services = nullptr;
bool allowTransit = false;
const Key_t&
OurKey() const
{
return ourKey;
}
private:
void
ScheduleCleanupTimer();
void
CleanupTX();
uint64_t ids;
struct TXOwner
{
Key_t node;
uint64_t txid = 0;
bool
operator==(const TXOwner& other) const
{
return txid == other.txid && node == other.node;
}
bool
operator<(const TXOwner& other) const
{
return txid < other.txid || node < other.node;
}
};
struct TXOwnerHash
{
std::size_t
operator()(TXOwner const& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.node[0], sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
};
std::unordered_map< TXOwner, SearchJob, TXOwnerHash > pendingTX;
Key_t ourKey;
};
}
}
struct llarp_dht_context
{
llarp::dht::Context impl;
llarp_router* parent;
llarp_dht_context(llarp_router* router);
};
#endif

View File

@ -0,0 +1,24 @@
#ifndef LLARP_DHT_KADEMLIA_HPP
#define LLARP_DHT_KADEMLIA_HPP
#include <llarp/dht/key.hpp>
namespace llarp
{
namespace dht
{
struct XorMetric
{
const Key_t& us;
XorMetric(const Key_t& ourKey) : us(ourKey){};
bool
operator()(const Key_t& left, const Key_t& right) const
{
return (us ^ left) < (us ^ right);
};
};
}
}
#endif

37
include/llarp/dht/key.hpp Normal file
View File

@ -0,0 +1,37 @@
#ifndef LLARP_DHT_KEY_HPP
#define LLARP_DHT_KEY_HPP
#include <llarp/aligned.hpp>
namespace llarp
{
namespace dht
{
struct Key_t : public llarp::AlignedBuffer< 32 >
{
Key_t(const byte_t* val) : llarp::AlignedBuffer< 32 >(val)
{
}
Key_t() : llarp::AlignedBuffer< 32 >()
{
}
Key_t
operator^(const Key_t& other) const
{
Key_t dist;
for(size_t idx = 0; idx < 4; ++idx)
dist.l[idx] = l[idx] ^ other.l[idx];
return dist;
}
bool
operator<(const Key_t& other) const
{
return memcmp(data_l(), other.data_l(), 32) < 0;
}
};
}
}
#endif

View File

@ -0,0 +1,41 @@
#ifndef LLARP_DHT_MESSAGE_HPP
#define LLARP_DHT_MESSAGE_HPP
#include <llarp/dht.h>
#include <llarp/bencode.hpp>
#include <llarp/dht/key.hpp>
#include <llarp/path_types.hpp>
#include <vector>
namespace llarp
{
namespace dht
{
constexpr size_t MAX_MSG_SIZE = 2048;
struct IMessage : public llarp::IBEncodeMessage
{
virtual ~IMessage(){};
/// construct
IMessage(const Key_t& from) : From(from)
{
}
virtual bool
HandleMessage(llarp_dht_context* dht,
std::vector< IMessage* >& replies) const = 0;
Key_t From;
PathID_t pathID;
};
IMessage*
DecodeMessage(const Key_t& from, llarp_buffer_t* buf, bool relayed = false);
bool
DecodeMesssageList(const Key_t& from, llarp_buffer_t* buf,
std::vector< IMessage* >& dst, bool relayed = false);
}
}
#endif

View File

@ -0,0 +1,8 @@
#ifndef LLARP_DHT_MESSAGES_ALL_HPP
#define LLARP_DHT_MESSAGES_ALL_HPP
#include <llarp/dht/messages/findintro.hpp>
#include <llarp/dht/messages/findrouter.hpp>
#include <llarp/dht/messages/gotintro.hpp>
#include <llarp/dht/messages/gotrouter.hpp>
#include <llarp/dht/messages/pubintro.hpp>
#endif

View File

@ -0,0 +1,30 @@
#ifndef LLARP_DHT_MESSAGES_FIND_INTRO_HPP
#define LLARP_DHT_MESSAGES_FIND_INTRO_HPP
#include <llarp/dht/message.hpp>
#include <llarp/service/address.hpp>
namespace llarp
{
namespace dht
{
struct FindIntroMessage : public IMessage
{
uint64_t R = 0;
llarp::service::Address S;
uint64_t T = 0;
~FindIntroMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
}
}
#endif

View File

@ -0,0 +1,54 @@
#ifndef LLARP_DHT_MESSAGES_FIND_ROUTER_HPP
#define LLARP_DHT_MESSAGES_FIND_ROUTER_HPP
#include <llarp/dht/message.hpp>
namespace llarp
{
namespace dht
{
struct FindRouterMessage : public IMessage
{
FindRouterMessage(const Key_t& from) : IMessage(from)
{
}
FindRouterMessage(const Key_t& from, const Key_t& target, uint64_t id)
: IMessage(from), K(target), txid(id)
{
}
~FindRouterMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
Key_t K;
bool iterative = false;
uint64_t txid = 0;
uint64_t version = 0;
};
/// variant of FindRouterMessage relayed via path
struct RelayedFindRouterMessage : public FindRouterMessage
{
RelayedFindRouterMessage(const Key_t& from) : FindRouterMessage(from)
{
}
/// handle a relayed FindRouterMessage, do a lookup on the dht and inform
/// the path of the result
/// TODO: smart path expiration logic needs to be implemented
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
}
}
#endif

View File

@ -0,0 +1,32 @@
#ifndef LLARP_DHT_MESSAGES_GOT_INTRO_HPP
#define LLARP_DHT_MESSAGES_GOT_INTRO_HPP
#include <llarp/dht/message.hpp>
#include <llarp/service/IntroSet.hpp>
namespace llarp
{
namespace dht
{
/// acknologement to PublishIntroMessage or reply to FinIntroMessage
struct GotIntroMessage : public IMessage
{
std::list< llarp::service::IntroSet > I;
uint64_t T;
GotIntroMessage(uint64_t tx, const llarp::service::IntroSet* i = nullptr);
~GotIntroMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
}
}
#endif

View File

@ -0,0 +1,43 @@
#ifndef LLARP_DHT_MESSAGES_GOT_ROUTER_HPP
#define LLARP_DHT_MESSAGES_GOT_ROUTER_HPP
#include <llarp/dht/message.hpp>
namespace llarp
{
namespace dht
{
struct GotRouterMessage : public IMessage
{
GotRouterMessage(const Key_t& from) : IMessage(from)
{
}
GotRouterMessage(const Key_t& from, uint64_t id, const llarp_rc* result)
: IMessage(from), txid(id)
{
if(result)
{
R.emplace_back();
llarp_rc_clear(&R.back());
llarp_rc_copy(&R.back(), result);
}
}
~GotRouterMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
std::vector< llarp_rc > R;
uint64_t txid = 0;
uint64_t version = 0;
};
}
}
#endif

View File

@ -0,0 +1,34 @@
#ifndef LLARP_DHT_MESSAGES_PUB_INTRO_HPP
#define LLARP_DHT_MESSAGES_PUB_INTRO_HPP
#include <llarp/dht/message.hpp>
#include <llarp/service/IntroSet.hpp>
namespace llarp
{
namespace dht
{
struct PublishIntroMessage : public IMessage
{
llarp::service::IntroSet I;
uint64_t R = 0;
uint64_t S = 0;
uint64_t txID = 0;
bool hasS = false;
PublishIntroMessage() : IMessage({})
{
}
~PublishIntroMessage();
bool
BEncode(llarp_buffer_t* buf) const;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* val);
virtual bool
HandleMessage(llarp_dht_context* ctx,
std::vector< IMessage* >& replies) const;
};
}
}
#endif

View File

@ -0,0 +1,49 @@
#ifndef LLARP_DHT_NODE_HPP
#define LLARP_DHT_NODE_HPP
#include <llarp/router_contact.h>
#include <llarp/dht/key.hpp>
#include <llarp/service/IntroSet.hpp>
namespace llarp
{
namespace dht
{
struct RCNode
{
llarp_rc* rc;
Key_t ID;
RCNode() : rc(nullptr)
{
ID.Zero();
}
RCNode(llarp_rc* other) : rc(other)
{
ID = other->pubkey;
}
};
struct ISNode
{
llarp::service::IntroSet introset;
Key_t ID;
ISNode()
{
ID.Zero();
}
ISNode(const llarp::service::IntroSet& other)
{
introset = other;
other.A.CalculateAddress(ID);
}
};
}
}
#endif

View File

@ -0,0 +1,39 @@
#ifndef LLARP_DHT_SEARCH_JOB_HPP
#define LLARP_DHT_SEARCH_JOB_HPP
#include <llarp/dht.h>
#include <llarp/time.h>
#include <llarp/dht/key.hpp>
#include <set>
namespace llarp
{
namespace dht
{
struct SearchJob
{
const static uint64_t JobTimeout = 30000;
SearchJob();
SearchJob(const Key_t& requester, uint64_t requesterTX,
const Key_t& target, llarp_router_lookup_job* job,
const std::set< Key_t >& excludes);
void
Completed(const llarp_rc* router, bool timeout = false) const;
bool
IsExpired(llarp_time_t now) const;
llarp_router_lookup_job* job = nullptr;
llarp_time_t started;
Key_t requester;
uint64_t requesterTX;
Key_t target;
std::set< Key_t > exclude;
};
}
}
#endif

View File

@ -15,6 +15,7 @@
#include <llarp/routing/handler.hpp>
#include <llarp/routing/message.hpp>
#include <functional>
#include <list>
#include <map>
#include <mutex>

View File

@ -1,14 +0,0 @@
#ifndef LLARP_QUIC_H_
#define LLARP_QUIC_H_
#include "llarp/iwp/server.h"
struct llarp_quic_args
{
};
bool
quic_link_init(struct llarp_link* link, struct llarp_quic_args args,
struct llarp_msg_muxer* muxer);
#endif

View File

@ -13,9 +13,6 @@ namespace llarp
{
namespace service
{
std::string
AddressToString(const Address& addr);
struct Config
{
typedef std::list< std::pair< std::string, std::string > >

View File

@ -0,0 +1,17 @@
#ifndef LLARP_SERVICE_ADDRESS_HPP
#define LLARP_SERVICE_ADDRESS_HPP
#include <llarp/aligned.hpp>
#include <string>
namespace llarp
{
namespace service
{
typedef llarp::AlignedBuffer< 32 > Address;
std::string
AddressToString(const Address& addr);
}
}
#endif

View File

@ -1,16 +1,6 @@
#ifndef LLARP_SERVICE_TYPES_HPP
#define LLARP_SERVICE_TYPES_HPP
#include <llarp/aligned.hpp>
namespace llarp
{
namespace service
{
/// hidden service address
typedef llarp::AlignedBuffer< 32 > Address;
typedef llarp::AlignedBuffer< 16 > VanityNonce;
}
}
#include <llarp/service/address.hpp>
#include <llarp/service/vanity.hpp>
#endif

View File

@ -0,0 +1,14 @@
#ifndef LLARP_SERVICE_VANITY_HPP
#define LLARP_SERVICE_VANITY_HPP
#include <llarp/aligned.hpp>
namespace llarp
{
namespace service
{
/// hidden service address
typedef llarp::AlignedBuffer< 16 > VanityNonce;
}
}
#endif

View File

@ -10,861 +10,6 @@
#include <algorithm> // std::find
#include <set>
namespace llarp
{
DHTImmeidateMessage::~DHTImmeidateMessage()
{
for(auto &msg : msgs)
delete msg;
msgs.clear();
}
bool
DHTImmeidateMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *buf)
{
if(llarp_buffer_eq(key, "m"))
return llarp::dht::DecodeMesssageList(remote.data(), buf, msgs);
if(llarp_buffer_eq(key, "v"))
{
if(!bencode_read_integer(buf, &version))
return false;
return version == LLARP_PROTO_VERSION;
}
// bad key
return false;
}
bool
DHTImmeidateMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
// message type
if(!bencode_write_bytestring(buf, "a", 1))
return false;
if(!bencode_write_bytestring(buf, "m", 1))
return false;
// dht messages
if(!bencode_write_bytestring(buf, "m", 1))
return false;
// begin list
if(!bencode_start_list(buf))
return false;
for(const auto &msg : msgs)
{
if(!msg->BEncode(buf))
return false;
}
// end list
if(!bencode_end(buf))
return false;
// protocol version
if(!bencode_write_version_entry(buf))
return false;
return bencode_end(buf);
}
bool
DHTImmeidateMessage::HandleMessage(llarp_router *router) const
{
DHTImmeidateMessage *reply = new DHTImmeidateMessage(remote);
bool result = true;
for(auto &msg : msgs)
{
result &= msg->HandleMessage(router->dht, reply->msgs);
}
return result && router->SendToOrQueue(remote.data(), reply);
}
namespace dht
{
struct PathLookupInformer
{
llarp_router *router;
PathID_t pathID;
uint64_t txid;
PathLookupInformer(llarp_router *r, const PathID_t &id, uint64_t tx)
: router(r), pathID(id), txid(tx)
{
}
void
SendReply(const llarp::routing::IMessage *msg)
{
auto path = router->paths.GetByUpstream(router->pubkey(), pathID);
if(path == nullptr)
{
llarp::LogWarn("Path not found for relayed DHT message txid=", txid,
" pathid=", pathID);
return;
}
if(!path->SendRoutingMessage(msg, router))
llarp::LogWarn("Failed to send reply for relayed DHT message txid=",
txid, "pathid=", pathID);
}
static void
InformReply(llarp_router_lookup_job *job)
{
PathLookupInformer *self =
static_cast< PathLookupInformer * >(job->user);
llarp::routing::DHTMessage reply;
if(job->found)
{
if(llarp_rc_verify_sig(&self->router->crypto, &job->result))
{
reply.M.push_back(
new GotRouterMessage(job->target, self->txid, &job->result));
}
llarp_rc_free(&job->result);
llarp_rc_clear(&job->result);
}
else
{
reply.M.push_back(
new GotRouterMessage(job->target, self->txid, nullptr));
}
self->SendReply(&reply);
// TODO: is this okay?
delete self;
delete job;
}
};
/// variant of FindRouterMessage relayed via path
struct RelayedFindRouterMessage : public FindRouterMessage
{
RelayedFindRouterMessage(const Key_t &from) : FindRouterMessage(from)
{
}
/// handle a relayed FindRouterMessage, do a lookup on the dht and inform
/// the path of the result
/// TODO: smart path expiration logic needs to be implemented
virtual bool
HandleMessage(llarp_dht_context *ctx, std::vector< IMessage * > &replies)
{
auto &dht = ctx->impl;
/// lookup for us, send an immeidate reply
if(K == dht.OurKey())
{
auto path = dht.router->paths.GetByUpstream(K, pathID);
if(path)
{
replies.push_back(new GotRouterMessage(K, txid, &dht.router->rc));
return true;
}
return false;
}
llarp_router_lookup_job *job = new llarp_router_lookup_job;
PathLookupInformer *informer =
new PathLookupInformer(dht.router, pathID, txid);
job->user = informer;
job->hook = &PathLookupInformer::InformReply;
job->found = false;
job->dht = ctx;
memcpy(job->target, K, sizeof(job->target));
Key_t peer;
if(dht.nodes->FindClosest(K, peer))
dht.LookupRouter(K, dht.OurKey(), txid, peer, job);
return false;
}
};
GotIntroMessage::GotIntroMessage(uint64_t tx,
const llarp::service::IntroSet *i)
: IMessage({}), T(tx)
{
if(i)
{
I.push_back(*i);
}
}
GotIntroMessage::~GotIntroMessage()
{
}
bool
GotIntroMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
// TODO: implement me?
auto path = ctx->impl.router->paths.GetLocalPathSet(pathID);
if(path)
{
return path->HandleGotIntroMessage(this);
}
return false;
}
bool
GotIntroMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *buf)
{
if(llarp_buffer_eq(key, "I"))
{
return BEncodeReadList(I, buf);
}
bool read = false;
if(!BEncodeMaybeReadDictInt("T", T, read, key, buf))
return false;
if(!BEncodeMaybeReadDictInt("V", version, read, key, buf))
return false;
return read;
}
bool
GotIntroMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "A", "G"))
return false;
if(!BEncodeWriteDictList("I", I, buf))
return false;
if(!BEncodeWriteDictInt(buf, "T", T))
return false;
if(!BEncodeWriteDictInt(buf, "V", version))
return false;
return bencode_end(buf);
}
PublishIntroMessage::~PublishIntroMessage()
{
}
bool
PublishIntroMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
bool read = false;
if(!BEncodeMaybeReadDictEntry("I", I, read, key, val))
return false;
if(!BEncodeMaybeReadDictInt("R", R, read, key, val))
return false;
if(llarp_buffer_eq(key, "S"))
{
read = true;
hasS = true;
if(!bencode_read_integer(val, &S))
return false;
}
if(!BEncodeMaybeReadDictInt("V", version, read, key, val))
return false;
return read;
}
bool
PublishIntroMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
if(!I.VerifySignature(&dht.router->crypto))
{
llarp::LogWarn("invalid introset signature");
return false;
}
if(I.W && !I.W->IsValid(dht.router->crypto.shorthash))
{
llarp::LogWarn("proof of work not good enough for IntroSet");
return false;
}
// TODO: make this smarter
dht.services->PutNode(I);
replies.push_back(new GotIntroMessage(txID, &I));
return true;
}
bool
PublishIntroMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "A", "I"))
return false;
if(!BEncodeWriteDictEntry("I", I, buf))
return false;
if(!BEncodeWriteDictInt(buf, "R", R))
return false;
if(hasS)
{
if(!BEncodeWriteDictInt(buf, "S", S))
return false;
}
if(!BEncodeWriteDictInt(buf, "V", LLARP_PROTO_VERSION))
return false;
return bencode_end(buf);
}
GotRouterMessage::~GotRouterMessage()
{
for(auto &rc : R)
llarp_rc_free(&rc);
R.clear();
}
bool
GotRouterMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
// message type
if(!BEncodeWriteDictMsgType(buf, "A", "S"))
return false;
if(!BEncodeWriteDictList("R", R, buf))
return false;
// txid
if(!BEncodeWriteDictInt(buf, "T", txid))
return false;
// version
if(!BEncodeWriteDictInt(buf, "V", version))
return false;
return bencode_end(buf);
}
bool
GotRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
if(llarp_buffer_eq(key, "R"))
{
return BEncodeReadList(R, val);
}
if(llarp_buffer_eq(key, "T"))
{
return bencode_read_integer(val, &txid);
}
bool read = false;
if(!BEncodeMaybeReadVersion("V", version, LLARP_PROTO_VERSION, read, key,
val))
return false;
return read;
}
bool
GotRouterMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
auto pending = dht.FindPendingTX(From, txid);
if(pending)
{
if(R.size())
{
pending->Completed(&R[0]);
if(pending->requester != dht.OurKey())
{
replies.push_back(new GotRouterMessage(
pending->target, pending->requesterTX, &R[0]));
}
}
else
{
// iterate to next closest peer
Key_t nextPeer;
pending->exclude.insert(From);
if(pending->exclude.size() < 3
&& dht.nodes->FindCloseExcluding(pending->target, nextPeer,
pending->exclude))
{
llarp::LogInfo(pending->target, " was not found via ", From,
" iterating to next peer ", nextPeer,
" already asked ", pending->exclude.size(),
" other peers");
dht.LookupRouter(pending->target, pending->requester,
pending->requesterTX, nextPeer, nullptr, true,
pending->exclude);
}
else
{
llarp::LogInfo(pending->target, " was not found via ", From,
" and we won't look it up");
pending->Completed(nullptr);
if(pending->requester != dht.OurKey())
{
replies.push_back(new GotRouterMessage(
pending->target, pending->requesterTX, nullptr));
}
}
}
dht.RemovePendingLookup(From, txid);
return true;
}
llarp::LogWarn(
"Got response for DHT transaction we are not tracking, txid=", txid);
return false;
}
FindRouterMessage::~FindRouterMessage()
{
}
bool
FindRouterMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
// message type
if(!bencode_write_bytestring(buf, "A", 1))
return false;
if(!bencode_write_bytestring(buf, "R", 1))
return false;
// iterative or not?
if(!bencode_write_bytestring(buf, "I", 1))
return false;
if(!bencode_write_int(buf, iterative ? 1 : 0))
return false;
// key
if(!bencode_write_bytestring(buf, "K", 1))
return false;
if(!bencode_write_bytestring(buf, K.data(), K.size()))
return false;
// txid
if(!bencode_write_bytestring(buf, "T", 1))
return false;
if(!bencode_write_uint64(buf, txid))
return false;
// version
if(!bencode_write_bytestring(buf, "V", 1))
return false;
if(!bencode_write_uint64(buf, version))
return false;
return bencode_end(buf);
}
bool
FindRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
llarp_buffer_t strbuf;
if(llarp_buffer_eq(key, "I"))
{
uint64_t result;
if(!bencode_read_integer(val, &result))
return false;
iterative = result != 0;
return true;
}
if(llarp_buffer_eq(key, "K"))
{
if(!bencode_read_string(val, &strbuf))
return false;
if(strbuf.sz != K.size())
return false;
memcpy(K.data(), strbuf.base, K.size());
return true;
}
if(llarp_buffer_eq(key, "T"))
{
return bencode_read_integer(val, &txid);
}
if(llarp_buffer_eq(key, "V"))
{
return bencode_read_integer(val, &version);
}
return false;
}
bool
FindRouterMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
if(!dht.allowTransit)
{
llarp::LogWarn("Got DHT lookup from ", From,
" when we are not allowing dht transit");
return false;
}
auto pending = dht.FindPendingTX(From, txid);
if(pending)
{
llarp::LogWarn("Got duplicate DHT lookup from ", From, " txid=", txid);
return false;
}
dht.LookupRouterRelayed(From, txid, K, !iterative, replies);
return true;
}
struct MessageDecoder
{
const Key_t &From;
bool firstKey = true;
IMessage *msg = nullptr;
bool relayed = false;
MessageDecoder(const Key_t &from) : From(from)
{
}
static bool
on_key(dict_reader *r, llarp_buffer_t *key)
{
llarp_buffer_t strbuf;
MessageDecoder *dec = static_cast< MessageDecoder * >(r->user);
// check for empty dict
if(!key)
return !dec->firstKey;
// first key
if(dec->firstKey)
{
if(!llarp_buffer_eq(*key, "A"))
return false;
if(!bencode_read_string(r->buffer, &strbuf))
return false;
// bad msg size?
if(strbuf.sz != 1)
return false;
switch(*strbuf.base)
{
case 'R':
if(dec->relayed)
dec->msg = new RelayedFindRouterMessage(dec->From);
else
dec->msg = new FindRouterMessage(dec->From);
break;
case 'S':
if(dec->relayed)
{
llarp::LogWarn(
"GotRouterMessage found when parsing relayed DHT "
"message");
return false;
}
else
dec->msg = new GotRouterMessage(dec->From);
break;
case 'I':
dec->msg = new PublishIntroMessage();
break;
default:
llarp::LogWarn("unknown dht message type: ", (char)*strbuf.base);
// bad msg type
return false;
}
dec->firstKey = false;
return dec->msg != nullptr;
}
else
return dec->msg->DecodeKey(*key, r->buffer);
}
};
IMessage *
DecodeMesssage(const Key_t &from, llarp_buffer_t *buf, bool relayed)
{
MessageDecoder dec(from);
dec.relayed = relayed;
dict_reader r;
r.user = &dec;
r.on_key = &MessageDecoder::on_key;
if(bencode_read_dict(buf, &r))
return dec.msg;
else
{
if(dec.msg)
delete dec.msg;
return nullptr;
}
}
struct ListDecoder
{
ListDecoder(const Key_t &from, std::vector< IMessage * > &list)
: From(from), l(list){};
bool relayed = false;
const Key_t &From;
std::vector< IMessage * > &l;
static bool
on_item(list_reader *r, bool has)
{
ListDecoder *dec = static_cast< ListDecoder * >(r->user);
if(!has)
return true;
auto msg = DecodeMesssage(dec->From, r->buffer, dec->relayed);
if(msg)
{
dec->l.push_back(msg);
return true;
}
else
return false;
}
};
bool
DecodeMesssageList(const Key_t &from, llarp_buffer_t *buf,
std::vector< IMessage * > &list, bool relayed)
{
ListDecoder dec(from, list);
dec.relayed = relayed;
list_reader r;
r.user = &dec;
r.on_item = &ListDecoder::on_item;
return bencode_read_list(buf, &r);
}
SearchJob::SearchJob()
{
started = 0;
requester.Zero();
target.Zero();
}
SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key,
llarp_router_lookup_job *j,
const std::set< Key_t > &excludes)
: job(j)
, started(llarp_time_now_ms())
, requester(asker)
, requesterTX(tx)
, target(key)
, exclude(excludes)
{
}
void
SearchJob::Completed(const llarp_rc *router, bool timeout) const
{
if(job && job->hook)
{
if(router)
{
job->found = true;
llarp_rc_copy(&job->result, router);
}
job->hook(job);
}
}
bool
SearchJob::IsExpired(llarp_time_t now) const
{
return now - started >= JobTimeout;
}
Context::Context()
{
randombytes((byte_t *)&ids, sizeof(uint64_t));
}
Context::~Context()
{
if(nodes)
delete nodes;
if(services)
delete services;
}
void
Context::handle_cleaner_timer(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
ctx->CleanupTX();
ctx->ScheduleCleanupTimer();
}
void
Context::LookupRouterRelayed(const Key_t &requester, uint64_t txid,
const Key_t &target, bool recursive,
std::vector< IMessage * > &replies)
{
if(target == ourKey)
{
// we are the target, give them our RC
replies.push_back(new GotRouterMessage(requester, txid, &router->rc));
return;
}
Key_t next;
std::set< Key_t > excluding = {requester, ourKey};
if(nodes->FindCloseExcluding(target, next, excluding))
{
if(next == target)
{
// we know it
replies.push_back(
new GotRouterMessage(requester, txid, nodes->nodes[target].rc));
}
else if(recursive) // are we doing a recursive lookup?
{
if((requester ^ target) < (ourKey ^ target))
{
// we aren't closer to the target than next hop
// so we won't ask neighboor recursively, tell them we don't have it
llarp::LogInfo("we aren't closer to ", target, " than ", next,
" so we end it here");
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
}
else
{
// yeah, ask neighboor recursively
LookupRouter(target, requester, txid, next);
}
}
else // otherwise tell them we don't have it
{
llarp::LogInfo("we don't have ", target,
" and this was an iterative request so telling ",
requester, " that we don't have it");
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
}
}
else
{
// we don't know it and have no closer peers
llarp::LogInfo("we don't have ", target,
" and have no closer peers so telling ", requester,
" that we don't have it");
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
}
}
void
Context::RemovePendingLookup(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
search.txid = id;
auto itr = pendingTX.find(search);
if(itr == pendingTX.end())
return;
pendingTX.erase(itr);
}
SearchJob *
Context::FindPendingTX(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
search.txid = id;
auto itr = pendingTX.find(search);
if(itr == pendingTX.end())
return nullptr;
else
return &itr->second;
}
void
Context::CleanupTX()
{
auto now = llarp_time_now_ms();
llarp::LogDebug("DHT tick");
auto itr = pendingTX.begin();
while(itr != pendingTX.end())
{
if(itr->second.IsExpired(now))
{
itr->second.Completed(nullptr, true);
itr = pendingTX.erase(itr);
}
else
++itr;
}
}
void
Context::Init(const Key_t &us, llarp_router *r)
{
router = r;
ourKey = us;
nodes = new Bucket< RCNode >(ourKey);
services = new Bucket< ISNode >(ourKey);
llarp::LogDebug("intialize dht with key ", ourKey);
}
void
Context::ScheduleCleanupTimer()
{
llarp_logic_call_later(router->logic,
{1000, this, &handle_cleaner_timer});
}
bool
Context::RelayRequestForPath(const llarp::PathID_t &id, const IMessage *msg)
{
llarp::routing::DHTMessage reply;
if(!msg->HandleMessage(router->dht, reply.M))
return false;
auto path = router->paths.GetByUpstream(router->pubkey(), id);
return path && path->SendRoutingMessage(&reply, router);
}
void
Context::LookupRouter(const Key_t &target, const Key_t &whoasked,
uint64_t txid, const Key_t &askpeer,
llarp_router_lookup_job *job, bool iterative,
std::set< Key_t > excludes)
{
if(target.IsZero() || whoasked.IsZero() || askpeer.IsZero())
{
return;
}
auto id = ++ids;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
if(txid == 0)
txid = id;
pendingTX[ownerKey] = SearchJob(whoasked, txid, target, job, excludes);
llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ",
whoasked);
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindRouterMessage(askpeer, target, id);
dhtmsg->iterative = iterative;
msg->msgs.push_back(dhtmsg);
router->SendToOrQueue(askpeer, msg);
}
void
Context::LookupRouterViaJob(llarp_router_lookup_job *job)
{
Key_t peer;
if(nodes->FindClosest(job->target, peer))
LookupRouter(job->target, ourKey, 0, peer, job);
else if(job->hook)
{
job->found = false;
job->hook(job);
}
}
void
Context::queue_router_lookup(void *user)
{
llarp_router_lookup_job *job =
static_cast< llarp_router_lookup_job * >(user);
job->dht->impl.LookupRouterViaJob(job);
}
} // namespace dht
} // namespace llarp
llarp_dht_context::llarp_dht_context(llarp_router *router)
{
parent = router;

211
llarp/dht/context.cpp Normal file
View File

@ -0,0 +1,211 @@
#include <llarp/dht/context.hpp>
#include <llarp/dht/messages/gotrouter.hpp>
#include <llarp/messages/dht.hpp>
#include <llarp/messages/dht_immediate.hpp>
#include "router.hpp"
namespace llarp
{
namespace dht
{
Context::Context()
{
randombytes((byte_t *)&ids, sizeof(uint64_t));
}
Context::~Context()
{
if(nodes)
delete nodes;
if(services)
delete services;
}
void
Context::handle_cleaner_timer(void *u, uint64_t orig, uint64_t left)
{
if(left)
return;
Context *ctx = static_cast< Context * >(u);
ctx->CleanupTX();
ctx->ScheduleCleanupTimer();
}
void
Context::LookupRouterRelayed(const Key_t &requester, uint64_t txid,
const Key_t &target, bool recursive,
std::vector< IMessage * > &replies)
{
if(target == ourKey)
{
// we are the target, give them our RC
replies.push_back(new GotRouterMessage(requester, txid, &router->rc));
return;
}
Key_t next;
std::set< Key_t > excluding = {requester, ourKey};
if(nodes->FindCloseExcluding(target, next, excluding))
{
if(next == target)
{
// we know it
replies.push_back(
new GotRouterMessage(requester, txid, nodes->nodes[target].rc));
}
else if(recursive) // are we doing a recursive lookup?
{
if((requester ^ target) < (ourKey ^ target))
{
// we aren't closer to the target than next hop
// so we won't ask neighboor recursively, tell them we don't have it
llarp::LogInfo("we aren't closer to ", target, " than ", next,
" so we end it here");
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
}
else
{
// yeah, ask neighboor recursively
LookupRouter(target, requester, txid, next);
}
}
else // otherwise tell them we don't have it
{
llarp::LogInfo("we don't have ", target,
" and this was an iterative request so telling ",
requester, " that we don't have it");
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
}
}
else
{
// we don't know it and have no closer peers
llarp::LogInfo("we don't have ", target,
" and have no closer peers so telling ", requester,
" that we don't have it");
replies.push_back(new GotRouterMessage(requester, txid, nullptr));
}
}
void
Context::RemovePendingLookup(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
search.txid = id;
auto itr = pendingTX.find(search);
if(itr == pendingTX.end())
return;
pendingTX.erase(itr);
}
SearchJob *
Context::FindPendingTX(const Key_t &owner, uint64_t id)
{
TXOwner search;
search.node = owner;
search.txid = id;
auto itr = pendingTX.find(search);
if(itr == pendingTX.end())
return nullptr;
else
return &itr->second;
}
void
Context::CleanupTX()
{
auto now = llarp_time_now_ms();
llarp::LogDebug("DHT tick");
auto itr = pendingTX.begin();
while(itr != pendingTX.end())
{
if(itr->second.IsExpired(now))
{
itr->second.Completed(nullptr, true);
itr = pendingTX.erase(itr);
}
else
++itr;
}
}
void
Context::Init(const Key_t &us, llarp_router *r)
{
router = r;
ourKey = us;
nodes = new Bucket< RCNode >(ourKey);
services = new Bucket< ISNode >(ourKey);
llarp::LogDebug("intialize dht with key ", ourKey);
}
void
Context::ScheduleCleanupTimer()
{
llarp_logic_call_later(router->logic,
{1000, this, &handle_cleaner_timer});
}
bool
Context::RelayRequestForPath(const llarp::PathID_t &id, const IMessage *msg)
{
llarp::routing::DHTMessage reply;
if(!msg->HandleMessage(router->dht, reply.M))
return false;
auto path = router->paths.GetByUpstream(router->pubkey(), id);
return path && path->SendRoutingMessage(&reply, router);
}
void
Context::LookupRouter(const Key_t &target, const Key_t &whoasked,
uint64_t txid, const Key_t &askpeer,
llarp_router_lookup_job *job, bool iterative,
std::set< Key_t > excludes)
{
if(target.IsZero() || whoasked.IsZero() || askpeer.IsZero())
{
return;
}
auto id = ++ids;
TXOwner ownerKey;
ownerKey.node = askpeer;
ownerKey.txid = id;
if(txid == 0)
txid = id;
pendingTX[ownerKey] = SearchJob(whoasked, txid, target, job, excludes);
llarp::LogInfo("Asking ", askpeer, " for router ", target, " for ",
whoasked);
auto msg = new llarp::DHTImmeidateMessage(askpeer);
auto dhtmsg = new FindRouterMessage(askpeer, target, id);
dhtmsg->iterative = iterative;
msg->msgs.push_back(dhtmsg);
router->SendToOrQueue(askpeer, msg);
}
void
Context::LookupRouterViaJob(llarp_router_lookup_job *job)
{
Key_t peer;
if(nodes->FindClosest(job->target, peer))
LookupRouter(job->target, ourKey, 0, peer, job);
else if(job->hook)
{
job->found = false;
job->hook(job);
}
}
void
Context::queue_router_lookup(void *user)
{
llarp_router_lookup_job *job =
static_cast< llarp_router_lookup_job * >(user);
job->dht->impl.LookupRouterViaJob(job);
}
} // namespace dht
} // namespace llarp

129
llarp/dht/decode.cpp Normal file
View File

@ -0,0 +1,129 @@
#include <llarp/dht/context.hpp>
#include <llarp/dht/messages/all.hpp>
namespace llarp
{
namespace dht
{
struct MessageDecoder
{
const Key_t &From;
bool firstKey = true;
IMessage *msg = nullptr;
bool relayed = false;
MessageDecoder(const Key_t &from) : From(from)
{
}
static bool
on_key(dict_reader *r, llarp_buffer_t *key)
{
llarp_buffer_t strbuf;
MessageDecoder *dec = static_cast< MessageDecoder * >(r->user);
// check for empty dict
if(!key)
return !dec->firstKey;
// first key
if(dec->firstKey)
{
if(!llarp_buffer_eq(*key, "A"))
return false;
if(!bencode_read_string(r->buffer, &strbuf))
return false;
// bad msg size?
if(strbuf.sz != 1)
return false;
switch(*strbuf.base)
{
case 'R':
if(dec->relayed)
dec->msg = new RelayedFindRouterMessage(dec->From);
else
dec->msg = new FindRouterMessage(dec->From);
break;
case 'S':
if(dec->relayed)
{
llarp::LogWarn(
"GotRouterMessage found when parsing relayed DHT "
"message");
return false;
}
else
dec->msg = new GotRouterMessage(dec->From);
break;
case 'I':
dec->msg = new PublishIntroMessage();
break;
default:
llarp::LogWarn("unknown dht message type: ", (char)*strbuf.base);
// bad msg type
return false;
}
dec->firstKey = false;
return dec->msg != nullptr;
}
else
return dec->msg->DecodeKey(*key, r->buffer);
}
};
IMessage *
DecodeMesssage(const Key_t &from, llarp_buffer_t *buf, bool relayed)
{
MessageDecoder dec(from);
dec.relayed = relayed;
dict_reader r;
r.user = &dec;
r.on_key = &MessageDecoder::on_key;
if(bencode_read_dict(buf, &r))
return dec.msg;
else
{
if(dec.msg)
delete dec.msg;
return nullptr;
}
}
struct ListDecoder
{
ListDecoder(const Key_t &from, std::vector< IMessage * > &list)
: From(from), l(list){};
bool relayed = false;
const Key_t &From;
std::vector< IMessage * > &l;
static bool
on_item(list_reader *r, bool has)
{
ListDecoder *dec = static_cast< ListDecoder * >(r->user);
if(!has)
return true;
auto msg = DecodeMesssage(dec->From, r->buffer, dec->relayed);
if(msg)
{
dec->l.push_back(msg);
return true;
}
else
return false;
}
};
bool
DecodeMesssageList(const Key_t &from, llarp_buffer_t *buf,
std::vector< IMessage * > &list, bool relayed)
{
ListDecoder dec(from, list);
dec.relayed = relayed;
list_reader r;
r.user = &dec;
r.on_item = &ListDecoder::on_item;
return bencode_read_list(buf, &r);
}
}
}

View File

@ -0,0 +1,73 @@
#include <llarp/messages/dht_immediate.hpp>
#include "router.hpp"
namespace llarp
{
DHTImmeidateMessage::~DHTImmeidateMessage()
{
for(auto &msg : msgs)
delete msg;
msgs.clear();
}
bool
DHTImmeidateMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *buf)
{
if(llarp_buffer_eq(key, "m"))
return llarp::dht::DecodeMesssageList(remote.data(), buf, msgs);
if(llarp_buffer_eq(key, "v"))
{
if(!bencode_read_integer(buf, &version))
return false;
return version == LLARP_PROTO_VERSION;
}
// bad key
return false;
}
bool
DHTImmeidateMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
// message type
if(!bencode_write_bytestring(buf, "a", 1))
return false;
if(!bencode_write_bytestring(buf, "m", 1))
return false;
// dht messages
if(!bencode_write_bytestring(buf, "m", 1))
return false;
// begin list
if(!bencode_start_list(buf))
return false;
for(const auto &msg : msgs)
{
if(!msg->BEncode(buf))
return false;
}
// end list
if(!bencode_end(buf))
return false;
// protocol version
if(!bencode_write_version_entry(buf))
return false;
return bencode_end(buf);
}
bool
DHTImmeidateMessage::HandleMessage(llarp_router *router) const
{
DHTImmeidateMessage *reply = new DHTImmeidateMessage(remote);
bool result = true;
for(auto &msg : msgs)
{
result &= msg->HandleMessage(router->dht, reply->msgs);
}
return result && router->SendToOrQueue(remote.data(), reply);
}
}

195
llarp/dht/find_router.cpp Normal file
View File

@ -0,0 +1,195 @@
#include <llarp/dht/context.hpp>
#include <llarp/dht/messages/findrouter.hpp>
#include <llarp/dht/messages/gotrouter.hpp>
#include <llarp/messages/dht.hpp>
#include "router.hpp"
namespace llarp
{
namespace dht
{
struct PathLookupInformer
{
llarp_router *router;
PathID_t pathID;
uint64_t txid;
PathLookupInformer(llarp_router *r, const PathID_t &id, uint64_t tx)
: router(r), pathID(id), txid(tx)
{
}
void
SendReply(const llarp::routing::IMessage *msg)
{
auto path = router->paths.GetByUpstream(router->pubkey(), pathID);
if(path == nullptr)
{
llarp::LogWarn("Path not found for relayed DHT message txid=", txid,
" pathid=", pathID);
return;
}
if(!path->SendRoutingMessage(msg, router))
llarp::LogWarn("Failed to send reply for relayed DHT message txid=",
txid, "pathid=", pathID);
}
static void
InformReply(llarp_router_lookup_job *job)
{
PathLookupInformer *self =
static_cast< PathLookupInformer * >(job->user);
llarp::routing::DHTMessage reply;
if(job->found)
{
if(llarp_rc_verify_sig(&self->router->crypto, &job->result))
{
reply.M.push_back(
new GotRouterMessage(job->target, self->txid, &job->result));
}
llarp_rc_free(&job->result);
llarp_rc_clear(&job->result);
}
else
{
reply.M.push_back(
new GotRouterMessage(job->target, self->txid, nullptr));
}
self->SendReply(&reply);
// TODO: is this okay?
delete self;
delete job;
}
};
bool
RelayedFindRouterMessage::HandleMessage(
llarp_dht_context *ctx, std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
/// lookup for us, send an immeidate reply
if(K == dht.OurKey())
{
auto path = dht.router->paths.GetByUpstream(K, pathID);
if(path)
{
replies.push_back(new GotRouterMessage(K, txid, &dht.router->rc));
return true;
}
return false;
}
llarp_router_lookup_job *job = new llarp_router_lookup_job;
PathLookupInformer *informer =
new PathLookupInformer(dht.router, pathID, txid);
job->user = informer;
job->hook = &PathLookupInformer::InformReply;
job->found = false;
job->dht = ctx;
memcpy(job->target, K, sizeof(job->target));
Key_t peer;
if(dht.nodes->FindClosest(K, peer))
dht.LookupRouter(K, dht.OurKey(), txid, peer, job);
return false;
}
FindRouterMessage::~FindRouterMessage()
{
}
bool
FindRouterMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
// message type
if(!bencode_write_bytestring(buf, "A", 1))
return false;
if(!bencode_write_bytestring(buf, "R", 1))
return false;
// iterative or not?
if(!bencode_write_bytestring(buf, "I", 1))
return false;
if(!bencode_write_int(buf, iterative ? 1 : 0))
return false;
// key
if(!bencode_write_bytestring(buf, "K", 1))
return false;
if(!bencode_write_bytestring(buf, K.data(), K.size()))
return false;
// txid
if(!bencode_write_bytestring(buf, "T", 1))
return false;
if(!bencode_write_uint64(buf, txid))
return false;
// version
if(!bencode_write_bytestring(buf, "V", 1))
return false;
if(!bencode_write_uint64(buf, version))
return false;
return bencode_end(buf);
}
bool
FindRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
llarp_buffer_t strbuf;
if(llarp_buffer_eq(key, "I"))
{
uint64_t result;
if(!bencode_read_integer(val, &result))
return false;
iterative = result != 0;
return true;
}
if(llarp_buffer_eq(key, "K"))
{
if(!bencode_read_string(val, &strbuf))
return false;
if(strbuf.sz != K.size())
return false;
memcpy(K.data(), strbuf.base, K.size());
return true;
}
if(llarp_buffer_eq(key, "T"))
{
return bencode_read_integer(val, &txid);
}
if(llarp_buffer_eq(key, "V"))
{
return bencode_read_integer(val, &version);
}
return false;
}
bool
FindRouterMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
if(!dht.allowTransit)
{
llarp::LogWarn("Got DHT lookup from ", From,
" when we are not allowing dht transit");
return false;
}
auto pending = dht.FindPendingTX(From, txid);
if(pending)
{
llarp::LogWarn("Got duplicate DHT lookup from ", From, " txid=", txid);
return false;
}
dht.LookupRouterRelayed(From, txid, K, !iterative, replies);
return true;
}
}
}

69
llarp/dht/got_intro.cpp Normal file
View File

@ -0,0 +1,69 @@
#include <llarp/dht/context.hpp>
#include <llarp/dht/messages/gotintro.hpp>
#include <llarp/messages/dht.hpp>
#include "router.hpp"
namespace llarp
{
namespace dht
{
GotIntroMessage::GotIntroMessage(uint64_t tx,
const llarp::service::IntroSet *i)
: IMessage({}), T(tx)
{
if(i)
{
I.push_back(*i);
}
}
GotIntroMessage::~GotIntroMessage()
{
}
bool
GotIntroMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
// TODO: implement me better?
auto path = ctx->impl.router->paths.GetLocalPathSet(pathID);
if(path)
{
return path->HandleGotIntroMessage(this);
}
return false;
}
bool
GotIntroMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *buf)
{
if(llarp_buffer_eq(key, "I"))
{
return BEncodeReadList(I, buf);
}
bool read = false;
if(!BEncodeMaybeReadDictInt("T", T, read, key, buf))
return false;
if(!BEncodeMaybeReadDictInt("V", version, read, key, buf))
return false;
return read;
}
bool
GotIntroMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "A", "G"))
return false;
if(!BEncodeWriteDictList("I", I, buf))
return false;
if(!BEncodeWriteDictInt(buf, "T", T))
return false;
if(!BEncodeWriteDictInt(buf, "V", version))
return false;
return bencode_end(buf);
}
}
}

114
llarp/dht/got_router.cpp Normal file
View File

@ -0,0 +1,114 @@
#include <llarp/dht/context.hpp>
#include <llarp/dht/messages/gotrouter.hpp>
#include "router.hpp"
namespace llarp
{
namespace dht
{
GotRouterMessage::~GotRouterMessage()
{
for(auto &rc : R)
llarp_rc_free(&rc);
R.clear();
}
bool
GotRouterMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
// message type
if(!BEncodeWriteDictMsgType(buf, "A", "S"))
return false;
if(!BEncodeWriteDictList("R", R, buf))
return false;
// txid
if(!BEncodeWriteDictInt(buf, "T", txid))
return false;
// version
if(!BEncodeWriteDictInt(buf, "V", version))
return false;
return bencode_end(buf);
}
bool
GotRouterMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
if(llarp_buffer_eq(key, "R"))
{
return BEncodeReadList(R, val);
}
if(llarp_buffer_eq(key, "T"))
{
return bencode_read_integer(val, &txid);
}
bool read = false;
if(!BEncodeMaybeReadVersion("V", version, LLARP_PROTO_VERSION, read, key,
val))
return false;
return read;
}
bool
GotRouterMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
auto pending = dht.FindPendingTX(From, txid);
if(pending)
{
if(R.size())
{
pending->Completed(&R[0]);
if(pending->requester != dht.OurKey())
{
replies.push_back(new GotRouterMessage(
pending->target, pending->requesterTX, &R[0]));
}
}
else
{
// iterate to next closest peer
Key_t nextPeer;
pending->exclude.insert(From);
if(pending->exclude.size() < 3
&& dht.nodes->FindCloseExcluding(pending->target, nextPeer,
pending->exclude))
{
llarp::LogInfo(pending->target, " was not found via ", From,
" iterating to next peer ", nextPeer,
" already asked ", pending->exclude.size(),
" other peers");
dht.LookupRouter(pending->target, pending->requester,
pending->requesterTX, nextPeer, nullptr, true,
pending->exclude);
}
else
{
llarp::LogInfo(pending->target, " was not found via ", From,
" and we won't look it up");
pending->Completed(nullptr);
if(pending->requester != dht.OurKey())
{
replies.push_back(new GotRouterMessage(
pending->target, pending->requesterTX, nullptr));
}
}
}
dht.RemovePendingLookup(From, txid);
return true;
}
llarp::LogWarn(
"Got response for DHT transaction we are not tracking, txid=", txid);
return false;
}
}
}

View File

@ -0,0 +1,77 @@
#include <llarp/dht/context.hpp>
#include <llarp/dht/messages/pubintro.hpp>
#include <llarp/messages/dht.hpp>
#include "router.hpp"
namespace llarp
{
namespace dht
{
PublishIntroMessage::~PublishIntroMessage()
{
}
bool
PublishIntroMessage::DecodeKey(llarp_buffer_t key, llarp_buffer_t *val)
{
bool read = false;
if(!BEncodeMaybeReadDictEntry("I", I, read, key, val))
return false;
if(!BEncodeMaybeReadDictInt("R", R, read, key, val))
return false;
if(llarp_buffer_eq(key, "S"))
{
read = true;
hasS = true;
if(!bencode_read_integer(val, &S))
return false;
}
if(!BEncodeMaybeReadDictInt("V", version, read, key, val))
return false;
return read;
}
bool
PublishIntroMessage::HandleMessage(llarp_dht_context *ctx,
std::vector< IMessage * > &replies) const
{
auto &dht = ctx->impl;
if(!I.VerifySignature(&dht.router->crypto))
{
llarp::LogWarn("invalid introset signature");
return false;
}
if(I.W && !I.W->IsValid(dht.router->crypto.shorthash))
{
llarp::LogWarn("proof of work not good enough for IntroSet");
return false;
}
// TODO: make this smarter (?)
dht.services->PutNode(I);
replies.push_back(new GotIntroMessage(txID, &I));
return true;
}
bool
PublishIntroMessage::BEncode(llarp_buffer_t *buf) const
{
if(!bencode_start_dict(buf))
return false;
if(!BEncodeWriteDictMsgType(buf, "A", "I"))
return false;
if(!BEncodeWriteDictEntry("I", I, buf))
return false;
if(!BEncodeWriteDictInt(buf, "R", R))
return false;
if(hasS)
{
if(!BEncodeWriteDictInt(buf, "S", S))
return false;
}
if(!BEncodeWriteDictInt(buf, "V", LLARP_PROTO_VERSION))
return false;
return bencode_end(buf);
}
}
}

45
llarp/dht/search_job.cpp Normal file
View File

@ -0,0 +1,45 @@
#include <llarp/dht/search_job.hpp>
namespace llarp
{
namespace dht
{
SearchJob::SearchJob()
{
started = 0;
requester.Zero();
target.Zero();
}
SearchJob::SearchJob(const Key_t &asker, uint64_t tx, const Key_t &key,
llarp_router_lookup_job *j,
const std::set< Key_t > &excludes)
: job(j)
, started(llarp_time_now_ms())
, requester(asker)
, requesterTX(tx)
, target(key)
, exclude(excludes)
{
}
void
SearchJob::Completed(const llarp_rc *router, bool timeout) const
{
if(job && job->hook)
{
if(router)
{
job->found = true;
llarp_rc_copy(&job->result, router);
}
job->hook(job);
}
}
bool
SearchJob::IsExpired(llarp_time_t now) const
{
return now - started >= JobTimeout;
}
}
}

View File

@ -13,6 +13,7 @@
#include <llarp/dht.hpp>
#include <llarp/link_message.hpp>
#include <llarp/routing/handler.hpp>
#include <llarp/service.hpp>
#include "llarp/iwp/establish_job.hpp"
#include "crypto.hpp"

View File

@ -43,13 +43,9 @@ namespace llarp
job = this->jobs.front();
this->jobs.pop_front();
}
auto now = llarp_time_now_ms();
// do work
job->work(job->user);
auto after = llarp_time_now_ms();
auto dlt = after - now;
if(dlt > 10)
llarp::LogWarn("work took ", dlt, " ms");
delete job;
}
});

View File

@ -20,8 +20,10 @@ namespace llarp
Pool(size_t sz, const char* name);
void
QueueJob(const llarp_thread_job& job);
void
Join();
void
Stop();
std::vector< std::thread > threads;