1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00
lokinet/llarp/dht/txholder.hpp

216 lines
5.5 KiB
C++
Raw Normal View History

2019-01-22 02:14:02 +01:00
#ifndef LLARP_DHT_TXHOLDER
#define LLARP_DHT_TXHOLDER
#include <dht/tx.hpp>
#include <dht/txowner.hpp>
#include <util/time.hpp>
2019-02-08 20:43:25 +01:00
#include <util/status.hpp>
2019-01-22 02:14:02 +01:00
#include <memory>
#include <unordered_map>
namespace llarp
{
namespace dht
{
template <typename K, typename V, typename K_Hash>
2019-04-19 17:10:26 +02:00
struct TXHolder
2019-01-22 02:14:02 +01:00
{
using TXPtr = std::unique_ptr<TX<K, V>>;
2019-01-22 02:14:02 +01:00
// tx who are waiting for a reply for each key
std::unordered_multimap<K, TXOwner, K_Hash> waiting;
2019-01-22 02:14:02 +01:00
// tx timesouts by key
std::unordered_map<K, llarp_time_t, K_Hash> timeouts;
2019-01-22 02:14:02 +01:00
// maps remote peer with tx to handle reply from them
std::unordered_map<TXOwner, TXPtr, TXOwner::Hash> tx;
2019-01-22 02:14:02 +01:00
const TX<K, V>*
2019-01-22 02:14:02 +01:00
GetPendingLookupFrom(const TXOwner& owner) const;
2019-02-11 18:14:43 +01:00
util::StatusObject
2019-04-19 17:10:26 +02:00
ExtractStatus() const
2019-02-08 20:43:25 +01:00
{
2019-02-11 18:14:43 +01:00
util::StatusObject obj{};
std::vector<util::StatusObject> txObjs, timeoutsObjs, waitingObjs;
std::transform(
tx.begin(),
tx.end(),
std::back_inserter(txObjs),
[](const auto& item) -> util::StatusObject {
return util::StatusObject{{"owner", item.first.ExtractStatus()},
{"tx", item.second->ExtractStatus()}};
});
obj["tx"] = txObjs;
2019-02-11 18:14:43 +01:00
std::transform(
timeouts.begin(),
timeouts.end(),
std::back_inserter(timeoutsObjs),
2019-02-11 18:14:43 +01:00
[](const auto& item) -> util::StatusObject {
return util::StatusObject{{"time", to_json(item.second)},
2020-03-01 16:59:19 +01:00
{"target", item.first.ExtractStatus()}};
2019-02-11 18:14:43 +01:00
});
obj["timeouts"] = timeoutsObjs;
std::transform(
waiting.begin(),
waiting.end(),
std::back_inserter(waitingObjs),
[](const auto& item) -> util::StatusObject {
return util::StatusObject{{"target", item.first.ExtractStatus()},
{"whoasked", item.second.ExtractStatus()}};
});
obj["waiting"] = waitingObjs;
2019-02-11 18:14:43 +01:00
return obj;
2019-02-08 20:43:25 +01:00
}
2019-01-22 02:14:02 +01:00
bool
HasLookupFor(const K& target) const
{
return timeouts.find(target) != timeouts.end();
}
bool
HasPendingLookupFrom(const TXOwner& owner) const
{
return GetPendingLookupFrom(owner) != nullptr;
}
void
NewTX(
const TXOwner& askpeer,
const TXOwner& whoasked,
const K& k,
TX<K, V>* t,
llarp_time_t requestTimeoutMS = 15s);
2019-01-22 02:14:02 +01:00
/// mark tx as not fond
void
NotFound(const TXOwner& from, const std::unique_ptr<Key_t>& next);
2019-01-22 02:14:02 +01:00
void
Found(const TXOwner& from, const K& k, const std::vector<V>& values)
2019-01-22 02:14:02 +01:00
{
Inform(from, k, values, true);
}
/// inform all watches for key of values found
void
Inform(
TXOwner from,
K key,
std::vector<V> values,
bool sendreply = false,
bool removeTimeouts = true);
2019-01-22 02:14:02 +01:00
void
Expire(llarp_time_t now);
};
template <typename K, typename V, typename K_Hash>
const TX<K, V>*
TXHolder<K, V, K_Hash>::GetPendingLookupFrom(const TXOwner& owner) const
2019-01-22 02:14:02 +01:00
{
auto itr = tx.find(owner);
if (itr == tx.end())
2019-01-22 02:14:02 +01:00
{
return nullptr;
}
2019-07-06 19:03:40 +02:00
return itr->second.get();
2019-01-22 02:14:02 +01:00
}
template <typename K, typename V, typename K_Hash>
2019-01-22 02:14:02 +01:00
void
TXHolder<K, V, K_Hash>::NewTX(
const TXOwner& askpeer,
const TXOwner& whoasked,
const K& k,
TX<K, V>* t,
llarp_time_t requestTimeoutMS)
2019-01-22 02:14:02 +01:00
{
(void)whoasked;
tx.emplace(askpeer, std::unique_ptr<TX<K, V>>(t));
2019-01-22 02:14:02 +01:00
auto count = waiting.count(k);
waiting.emplace(k, askpeer);
auto itr = timeouts.find(k);
if (itr == timeouts.end())
2019-01-22 02:14:02 +01:00
{
timeouts.emplace(k, time_now_ms() + requestTimeoutMS);
}
if (count == 0)
2019-01-22 02:14:02 +01:00
{
t->Start(askpeer);
}
}
template <typename K, typename V, typename K_Hash>
2019-01-22 02:14:02 +01:00
void
TXHolder<K, V, K_Hash>::NotFound(const TXOwner& from, const std::unique_ptr<Key_t>&)
2019-01-22 02:14:02 +01:00
{
2019-03-27 13:36:27 +01:00
auto txitr = tx.find(from);
if (txitr == tx.end())
2019-01-22 02:14:02 +01:00
{
return;
}
2020-01-14 01:49:09 +01:00
Inform(from, txitr->second->target, {}, true, true);
2019-01-22 02:14:02 +01:00
}
template <typename K, typename V, typename K_Hash>
2019-01-22 02:14:02 +01:00
void
TXHolder<K, V, K_Hash>::Inform(
TXOwner from, K key, std::vector<V> values, bool sendreply, bool removeTimeouts)
2019-01-22 02:14:02 +01:00
{
auto range = waiting.equal_range(key);
auto itr = range.first;
while (itr != range.second)
2019-01-22 02:14:02 +01:00
{
auto txitr = tx.find(itr->second);
if (txitr != tx.end())
2019-01-22 02:14:02 +01:00
{
for (const auto& value : values)
2019-01-22 02:14:02 +01:00
{
txitr->second->OnFound(from.node, value);
}
if (sendreply)
2019-01-22 02:14:02 +01:00
{
txitr->second->SendReply();
tx.erase(txitr);
}
}
++itr;
}
if (sendreply)
2019-01-22 02:14:02 +01:00
{
waiting.erase(key);
}
if (removeTimeouts)
2019-01-22 02:14:02 +01:00
{
timeouts.erase(key);
}
}
template <typename K, typename V, typename K_Hash>
2019-01-22 02:14:02 +01:00
void
TXHolder<K, V, K_Hash>::Expire(llarp_time_t now)
2019-01-22 02:14:02 +01:00
{
auto itr = timeouts.begin();
while (itr != timeouts.end())
2019-01-22 02:14:02 +01:00
{
if (now >= itr->second)
2019-01-22 02:14:02 +01:00
{
Inform(TXOwner{}, itr->first, {}, true, false);
itr = timeouts.erase(itr);
}
else
{
++itr;
}
}
}
} // namespace dht
} // namespace llarp
#endif