mirror of https://github.com/oxen-io/lokinet
Propagate Introset publishing redundantly
This commit is contained in:
parent
dc7828941f
commit
dff9aeb250
|
@ -123,10 +123,9 @@ namespace llarp
|
|||
|
||||
/// send introset to peer from source with S counter and excluding peers
|
||||
void
|
||||
PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX,
|
||||
const service::EncryptedIntroSet& introset,
|
||||
const Key_t& peer, uint64_t S,
|
||||
const std::set< Key_t >& exclude) override;
|
||||
PropagateIntroSetTo(const Key_t& from, uint64_t txid,
|
||||
const service::EncryptedIntroSet& introset,
|
||||
const Key_t& tellpeer, bool relayed, uint64_t relayOrder);
|
||||
|
||||
/// initialize dht context and explore every exploreInterval milliseconds
|
||||
void
|
||||
|
@ -556,16 +555,15 @@ namespace llarp
|
|||
|
||||
void
|
||||
Context::PropagateIntroSetTo(const Key_t& from, uint64_t txid,
|
||||
const service::EncryptedIntroSet& introset,
|
||||
const Key_t& tellpeer, uint64_t S,
|
||||
const std::set< Key_t >& exclude)
|
||||
const service::EncryptedIntroSet& introset,
|
||||
const Key_t& tellpeer, bool relayed, uint64_t relayOrder)
|
||||
{
|
||||
TXOwner asker(from, txid);
|
||||
TXOwner peer(tellpeer, ++ids);
|
||||
const Key_t addr(introset.derivedSigningKey);
|
||||
_pendingIntrosetLookups.NewTX(
|
||||
peer, asker, addr,
|
||||
new PublishServiceJob(asker, introset, this, S, exclude));
|
||||
new PublishServiceJob(asker, introset, this, relayed, relayOrder));
|
||||
}
|
||||
|
||||
void
|
||||
|
|
|
@ -94,8 +94,7 @@ namespace llarp
|
|||
virtual void
|
||||
PropagateIntroSetTo(const Key_t& source, uint64_t sourceTX,
|
||||
const service::EncryptedIntroSet& introset,
|
||||
const Key_t& peer, uint64_t S,
|
||||
const std::set< Key_t >& exclude) = 0;
|
||||
const Key_t& peer, bool relayed, uint64_t relayOrder) = 0;
|
||||
|
||||
virtual void
|
||||
Init(const Key_t& us, AbstractRouter* router,
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#include <messages/dht_immediate.hpp>
|
||||
#include <router/abstractrouter.hpp>
|
||||
#include <routing/dht_message.hpp>
|
||||
#include <nodedb.hpp>
|
||||
|
||||
namespace llarp
|
||||
{
|
||||
|
@ -18,13 +19,12 @@ namespace llarp
|
|||
llarp_buffer_t *val)
|
||||
{
|
||||
bool read = false;
|
||||
if(key == "E")
|
||||
{
|
||||
return BEncodeReadList(exclude, val);
|
||||
}
|
||||
if(!BEncodeMaybeReadDictEntry("I", introset, read, key, val))
|
||||
return false;
|
||||
if(!BEncodeMaybeReadDictInt("S", depth, read, key, val))
|
||||
if(!BEncodeMaybeReadDictInt("O", relayOrder, read, key, val))
|
||||
return false;
|
||||
uint64_t relayedInt = (relayed ? 1 : 0);
|
||||
if(!BEncodeMaybeReadDictInt("R", relayedInt, read, key, val))
|
||||
return false;
|
||||
if(!BEncodeMaybeReadDictInt("T", txID, read, key, val))
|
||||
return false;
|
||||
|
@ -40,44 +40,90 @@ namespace llarp
|
|||
{
|
||||
auto now = ctx->impl->Now();
|
||||
|
||||
if(depth > MaxPropagationDepth)
|
||||
{
|
||||
llarp::LogWarn("invalid propgagation depth value ", depth, " > ",
|
||||
MaxPropagationDepth);
|
||||
return false;
|
||||
}
|
||||
auto &dht = *ctx->impl;
|
||||
if(!introset.Verify(now))
|
||||
{
|
||||
llarp::LogWarn("invalid introset: ", introset);
|
||||
llarp::LogWarn("Received PublishIntroMessage with invalid introset: ",
|
||||
introset);
|
||||
// don't propogate or store
|
||||
replies.emplace_back(new GotIntroMessage({}, txID));
|
||||
return true;
|
||||
}
|
||||
|
||||
if(introset.IsExpired(now + llarp::service::MAX_INTROSET_TIME_DELTA))
|
||||
{
|
||||
// don't propogate or store
|
||||
llarp::LogWarn("Received PublishIntroMessage with expired Introset: ",
|
||||
introset);
|
||||
replies.emplace_back(new GotIntroMessage({}, txID));
|
||||
return true;
|
||||
}
|
||||
|
||||
const llarp::dht::Key_t addr(introset.derivedSigningKey);
|
||||
|
||||
now += llarp::service::MAX_INTROSET_TIME_DELTA;
|
||||
if(introset.IsExpired(now))
|
||||
// identify closest 4 routers
|
||||
auto closestRCs = dht.GetRouter()->nodedb()->FindClosestTo(addr, 4);
|
||||
if (closestRCs.size() != 4)
|
||||
{
|
||||
// don't propogate or store
|
||||
replies.emplace_back(new GotIntroMessage({}, txID));
|
||||
return true;
|
||||
llarp::LogWarn("Received PublishIntroMessage but only know ",
|
||||
closestRCs.size(), " nodes");
|
||||
replies.emplace_back(new GotIntroMessage({}, txID));
|
||||
return true;
|
||||
}
|
||||
dht.services()->PutNode(introset);
|
||||
replies.emplace_back(new GotIntroMessage({introset}, txID));
|
||||
Key_t peer;
|
||||
std::set< Key_t > exclude_propagate;
|
||||
for(const auto &e : exclude)
|
||||
exclude_propagate.insert(e);
|
||||
exclude_propagate.insert(From);
|
||||
exclude_propagate.insert(dht.OurKey());
|
||||
if(depth > 0
|
||||
&& dht.Nodes()->FindCloseExcluding(addr, peer, exclude_propagate))
|
||||
|
||||
// function to identify the closest 4 routers we know of for this introset
|
||||
auto propagateToClosestFour = [&, this]() {
|
||||
|
||||
// grab 1st & 2nd if we are relayOrder == 0, 3rd & 4th otherwise
|
||||
auto rc0 = (relayOrder == 0 ? closestRCs[0] : closestRCs[2]);
|
||||
auto rc1 = (relayOrder == 0 ? closestRCs[1] : closestRCs[3]);
|
||||
|
||||
Key_t peer0{rc0.pubkey};
|
||||
Key_t peer1{rc1.pubkey};
|
||||
|
||||
// TODO: handle case where we are peer0 or peer1
|
||||
|
||||
dht.PropagateIntroSetTo(From, txID, introset, peer0, false, 0);
|
||||
dht.PropagateIntroSetTo(From, txID, introset, peer1, false, 0);
|
||||
};
|
||||
|
||||
if (relayed)
|
||||
{
|
||||
dht.PropagateIntroSetTo(From, txID, introset, peer, depth - 1,
|
||||
exclude_propagate);
|
||||
if (relayOrder > 1)
|
||||
{
|
||||
llarp::LogWarn("Received PublishIntroMessage with invalid relayOrder: ",
|
||||
relayOrder);
|
||||
replies.emplace_back(new GotIntroMessage({}, txID));
|
||||
return true;
|
||||
}
|
||||
|
||||
propagateToClosestFour();
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
bool found = false;
|
||||
for (const auto& rc : closestRCs)
|
||||
{
|
||||
if (rc.pubkey == dht.OurKey())
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (found)
|
||||
{
|
||||
dht.services()->PutNode(introset);
|
||||
replies.emplace_back(new GotIntroMessage({introset}, txID));
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: ensure this can't create a loop (reintroduce depth?)
|
||||
propagateToClosestFour();
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -88,11 +134,11 @@ namespace llarp
|
|||
return false;
|
||||
if(!BEncodeWriteDictMsgType(buf, "A", "I"))
|
||||
return false;
|
||||
if(!BEncodeWriteDictList("E", exclude, buf))
|
||||
return false;
|
||||
if(!BEncodeWriteDictEntry("I", introset, buf))
|
||||
return false;
|
||||
if(!BEncodeWriteDictInt("S", depth, buf))
|
||||
if(!BEncodeWriteDictInt("O", relayOrder, buf))
|
||||
return false;
|
||||
if(!BEncodeWriteDictInt("R", relayed, buf))
|
||||
return false;
|
||||
if(!BEncodeWriteDictInt("T", txID, buf))
|
||||
return false;
|
||||
|
|
|
@ -14,20 +14,19 @@ namespace llarp
|
|||
{
|
||||
static const uint64_t MaxPropagationDepth;
|
||||
llarp::service::EncryptedIntroSet introset;
|
||||
std::vector< Key_t > exclude;
|
||||
uint64_t depth = 0;
|
||||
bool relayed = false;
|
||||
uint64_t relayOrder = 0;
|
||||
uint64_t txID = 0;
|
||||
PublishIntroMessage() : IMessage({})
|
||||
{
|
||||
}
|
||||
|
||||
PublishIntroMessage(const llarp::service::EncryptedIntroSet& i,
|
||||
uint64_t tx, uint64_t s,
|
||||
std::vector< Key_t > _exclude = {})
|
||||
PublishIntroMessage(const llarp::service::EncryptedIntroSet& introset_,
|
||||
uint64_t tx, bool relayed_, uint64_t relayOrder_)
|
||||
: IMessage({})
|
||||
, introset(i)
|
||||
, exclude(std::move(_exclude))
|
||||
, depth(s)
|
||||
, introset(introset_)
|
||||
, relayed(relayed_)
|
||||
, relayOrder(relayOrder_)
|
||||
, txID(tx)
|
||||
{
|
||||
}
|
||||
|
|
|
@ -9,14 +9,13 @@ namespace llarp
|
|||
namespace dht
|
||||
{
|
||||
PublishServiceJob::PublishServiceJob(const TXOwner &asker,
|
||||
const service::EncryptedIntroSet &I,
|
||||
AbstractContext *ctx, uint64_t s,
|
||||
std::set< Key_t > exclude)
|
||||
const service::EncryptedIntroSet &introset_,
|
||||
AbstractContext *ctx, bool relayed_, uint64_t relayOrder_)
|
||||
: TX< Key_t, service::EncryptedIntroSet >(
|
||||
asker, Key_t{I.derivedSigningKey}, ctx)
|
||||
, S(s)
|
||||
, dontTell(std::move(exclude))
|
||||
, introset(I)
|
||||
asker, Key_t{introset_.derivedSigningKey}, ctx)
|
||||
, relayed(relayed_)
|
||||
, relayOrder(relayOrder_)
|
||||
, introset(introset_)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -36,14 +35,9 @@ namespace llarp
|
|||
void
|
||||
PublishServiceJob::Start(const TXOwner &peer)
|
||||
{
|
||||
std::vector< Key_t > exclude;
|
||||
for(const auto &router : dontTell)
|
||||
{
|
||||
exclude.push_back(router);
|
||||
}
|
||||
parent->DHTSendTo(
|
||||
peer.node.as_array(),
|
||||
new PublishIntroMessage(introset, peer.txid, S, exclude));
|
||||
new PublishIntroMessage(introset, peer.txid, relayed, relayOrder));
|
||||
}
|
||||
} // namespace dht
|
||||
} // namespace llarp
|
||||
|
|
|
@ -14,14 +14,13 @@ namespace llarp
|
|||
{
|
||||
struct PublishServiceJob : public TX< Key_t, service::EncryptedIntroSet >
|
||||
{
|
||||
uint64_t S;
|
||||
std::set< Key_t > dontTell;
|
||||
bool relayed;
|
||||
uint64_t relayOrder;
|
||||
service::EncryptedIntroSet introset;
|
||||
|
||||
PublishServiceJob(const TXOwner &asker,
|
||||
const service::EncryptedIntroSet &introset,
|
||||
AbstractContext *ctx, uint64_t s,
|
||||
std::set< Key_t > exclude);
|
||||
AbstractContext *ctx, bool relayed, uint64_t relayOrder);
|
||||
|
||||
bool
|
||||
Validate(const service::EncryptedIntroSet &introset) const override;
|
||||
|
|
|
@ -468,7 +468,7 @@ namespace llarp
|
|||
size_t published = 0;
|
||||
for(const auto& path : paths)
|
||||
{
|
||||
if(PublishIntroSetVia(i, r, path))
|
||||
if(PublishIntroSetVia(i, r, path, published))
|
||||
{
|
||||
published++;
|
||||
}
|
||||
|
@ -480,11 +480,13 @@ namespace llarp
|
|||
{
|
||||
EncryptedIntroSet m_IntroSet;
|
||||
Endpoint* m_Endpoint;
|
||||
uint64_t m_relayOrder;
|
||||
PublishIntroSetJob(Endpoint* parent, uint64_t id,
|
||||
EncryptedIntroSet introset)
|
||||
EncryptedIntroSet introset, uint64_t relayOrder)
|
||||
: IServiceLookup(parent, id, "PublishIntroSet")
|
||||
, m_IntroSet(std::move(introset))
|
||||
, m_Endpoint(parent)
|
||||
, m_relayOrder(relayOrder)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -493,7 +495,7 @@ namespace llarp
|
|||
{
|
||||
auto msg = std::make_shared< routing::DHTMessage >();
|
||||
msg->M.emplace_back(
|
||||
std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, 5));
|
||||
std::make_unique< dht::PublishIntroMessage >(m_IntroSet, txid, true, m_relayOrder));
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
@ -526,9 +528,9 @@ namespace llarp
|
|||
|
||||
bool
|
||||
Endpoint::PublishIntroSetVia(const EncryptedIntroSet& i, AbstractRouter* r,
|
||||
path::Path_ptr path)
|
||||
path::Path_ptr path, uint64_t relayOrder)
|
||||
{
|
||||
auto job = new PublishIntroSetJob(this, GenTXID(), i);
|
||||
auto job = new PublishIntroSetJob(this, GenTXID(), i, relayOrder);
|
||||
if(job->SendRequestViaPath(path, r))
|
||||
{
|
||||
m_state->m_LastPublishAttempt = Now();
|
||||
|
|
|
@ -172,7 +172,7 @@ namespace llarp
|
|||
|
||||
bool
|
||||
PublishIntroSetVia(const EncryptedIntroSet& i, AbstractRouter* r,
|
||||
path::Path_ptr p);
|
||||
path::Path_ptr p, uint64_t relayOrder);
|
||||
|
||||
bool
|
||||
HandleGotIntroMessage(
|
||||
|
|
|
@ -62,8 +62,7 @@ namespace llarp
|
|||
MOCK_METHOD6(PropagateIntroSetTo,
|
||||
void(const dht::Key_t& source, uint64_t sourceTX,
|
||||
const service::EncryptedIntroSet& introset,
|
||||
const dht::Key_t& peer, uint64_t S,
|
||||
const std::set< dht::Key_t >& exclude));
|
||||
const dht::Key_t& peer, bool relayed, uint64_t relayOrder));
|
||||
|
||||
MOCK_METHOD3(Init,
|
||||
void(const dht::Key_t&, AbstractRouter*, llarp_time_t));
|
||||
|
|
Loading…
Reference in New Issue