1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

Merge pull request #1659 from majestrate/oxend-connection-reporting-2021-06-04

initial lokinet router testing
This commit is contained in:
Jeff 2021-06-08 12:52:20 -04:00 committed by GitHub
commit 835ba296b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 714 additions and 148 deletions

View file

@ -103,6 +103,7 @@ add_library(liblokinet
dns/unbound_resolver.cpp
consensus/table.cpp
consensus/reachability_testing.cpp
bootstrap.cpp
context.cpp

View file

@ -0,0 +1,157 @@
#include "reachability_testing.hpp"
#include <chrono>
#include <llarp/router/abstractrouter.hpp>
#include <llarp/util/logging/logger.hpp>
#include <llarp/crypto/crypto.hpp>
using std::chrono::steady_clock;
namespace llarp::consensus
{
using fseconds = std::chrono::duration<float, std::chrono::seconds::period>;
using fminutes = std::chrono::duration<float, std::chrono::minutes::period>;
static void
check_incoming_tests_impl(
std::string_view name,
const time_point_t& now,
const time_point_t& startup,
detail::incoming_test_state& incoming)
{
const auto elapsed = now - std::max(startup, incoming.last_test);
bool failing = elapsed > reachability_testing::MAX_TIME_WITHOUT_PING;
bool whine = failing != incoming.was_failing
|| (failing && now - incoming.last_whine > reachability_testing::WHINING_INTERVAL);
incoming.was_failing = failing;
if (whine)
{
incoming.last_whine = now;
if (!failing)
{
LogInfo(name, " ping received; port is likely reachable again");
}
else
{
if (incoming.last_test.time_since_epoch() == 0s)
{
LogWarn("Have NEVER received ", name, " pings!");
}
else
{
LogWarn(
"Have not received ",
name,
" pings for a long time: ",
fminutes{elapsed}.count(),
" minutes");
}
LogWarn(
"Please check your ",
name,
" port. Not being reachable "
"over ",
name,
" may result in a deregistration!");
}
}
}
void
reachability_testing::check_incoming_tests(const time_point_t& now)
{
check_incoming_tests_impl("lokinet", now, startup, last);
}
void
reachability_testing::incoming_ping(const time_point_t& now)
{
last.last_test = now;
}
std::optional<RouterID>
reachability_testing::next_random(AbstractRouter* router, const time_point_t& now, bool requeue)
{
if (next_general_test > now)
return std::nullopt;
CSRNG rng;
next_general_test =
now + std::chrono::duration_cast<time_point_t::duration>(fseconds(TESTING_INTERVAL(rng)));
// Pull the next element off the queue, but skip ourself, any that are no longer registered, and
// any that are currently known to be failing (those are queued for testing separately).
RouterID my_pk{router->pubkey()};
while (!testing_queue.empty())
{
auto& pk = testing_queue.back();
std::optional<RouterID> sn;
if (pk != my_pk && !failing.count(pk))
sn = pk;
testing_queue.pop_back();
if (sn)
return sn;
}
if (!requeue)
return std::nullopt;
// FIXME: when a *new* node comes online we need to inject it into a random position in the SN
// list with probability (L/N) [L = current list size, N = potential list size]
//
// (FIXME: put this FIXME in a better place ;-) )
// We exhausted the queue so repopulate it and try again
testing_queue.clear();
const auto all = router->GetRouterWhitelist();
testing_queue.insert(testing_queue.begin(), all.begin(), all.end());
std::shuffle(testing_queue.begin(), testing_queue.end(), rng);
// Recurse with the rebuild list, but don't let it try rebuilding again
return next_random(router, now, false);
}
std::vector<std::pair<RouterID, int>>
reachability_testing::get_failing(const time_point_t& now)
{
// Our failing_queue puts the oldest retest times at the top, so pop them off into our result
// until the top node should be retested sometime in the future
std::vector<std::pair<RouterID, int>> result;
while (result.size() < MAX_RETESTS_PER_TICK && !failing_queue.empty())
{
auto& [pk, retest_time, failures] = failing_queue.top();
if (retest_time > now)
break;
result.emplace_back(pk, failures);
failing.erase(pk);
failing_queue.pop();
}
return result;
}
void
reachability_testing::add_failing_node(const RouterID& pk, int previous_failures)
{
using namespace std::chrono;
if (previous_failures < 0)
previous_failures = 0;
CSRNG rng;
auto next_test_in = duration_cast<time_point_t::duration>(
previous_failures * TESTING_BACKOFF + fseconds{TESTING_INTERVAL(rng)});
if (next_test_in > TESTING_BACKOFF_MAX)
next_test_in = TESTING_BACKOFF_MAX;
failing.insert(pk);
failing_queue.emplace(pk, steady_clock::now() + next_test_in, previous_failures + 1);
}
void
reachability_testing::remove_node_from_failing(const RouterID& pk)
{
failing.erase(pk);
}
} // namespace llarp::consensus

View file

@ -0,0 +1,148 @@
#pragma once
#include <chrono>
#include <queue>
#include <random>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <llarp/util/time.hpp>
#include <llarp/router_id.hpp>
namespace llarp
{
struct AbstractRouter;
}
namespace llarp::consensus
{
namespace detail
{
using clock_t = std::chrono::steady_clock;
using time_point_t = std::chrono::time_point<clock_t>;
// Returns std::greater on the std::get<N>(v)th element value.
template <typename T, size_t N>
struct nth_greater
{
constexpr bool
operator()(const T& lhs, const T& rhs) const
{
return std::greater<std::tuple_element_t<N, T>>{}(std::get<N>(lhs), std::get<N>(rhs));
}
};
struct incoming_test_state
{
time_point_t last_test{};
time_point_t last_whine{};
bool was_failing = false;
};
} // namespace detail
using time_point_t = detail::time_point_t;
using clock_t = detail::clock_t;
// How often we tick the timer to check whether we need to do any tests.
constexpr auto REACHABILITY_TESTING_TIMER_INTERVAL = 50ms;
class reachability_testing
{
public:
// Distribution for the seconds between node tests: we throw in some randomness to avoid
// potential clustering of tests. (Note that there is some granularity here as the test timer
// only runs every REACHABILITY_TESTING_TIMER_INTERVAL).
std::normal_distribution<float> TESTING_INTERVAL{10.0, 3.0};
// The linear backoff after each consecutive test failure before we re-test. Specifically we
// schedule the next re-test for (TESTING_BACKOFF*previous_failures) + TESTING_INTERVAL(rng).
inline static constexpr auto TESTING_BACKOFF = 10s;
// The upper bound for the re-test interval.
inline static constexpr auto TESTING_BACKOFF_MAX = 2min;
// The maximum number of nodes that we will re-test at once (i.e. per TESTING_TIMING_INTERVAL);
// mainly intended to throttle ourselves if, for instance, our own connectivity loss makes us
// accumulate tons of nodes to test all at once. (Despite the random intervals, this can happen
// if we also get decommissioned during which we can't test at all but still have lots of
// failing nodes we want to test right away when we get recommissioned).
inline static constexpr int MAX_RETESTS_PER_TICK = 4;
// Maximum time without a ping before we start whining about it.
//
// We have a probability of about 0.368* of *not* getting pinged within a ping interval (10s),
// and so the probability of not getting a ping for 2 minutes (i.e. 12 test spans) just because
// we haven't been selected is extremely small (0.0000061). It also coincides nicely with
// blockchain time (i.e. two minutes) and our max testing backoff.
//
// * = approx value of ((n-1)/n)^n for non-tiny values of n
inline static constexpr auto MAX_TIME_WITHOUT_PING = 2min;
// How often we whine in the logs about being unreachable
inline static constexpr auto WHINING_INTERVAL = 2min;
private:
// Queue of pubkeys of service nodes to test; we pop off the back of this until the queue
// empties then we refill it with a shuffled list of all pubkeys then pull off of it until it is
// empty again, etc.
std::vector<RouterID> testing_queue;
// The next time for a general test
time_point_t next_general_test = time_point_t::min();
// When we started, so that we know not to hold off on whining about no pings for a while.
const time_point_t startup = clock_t::now();
// Pubkeys, next test times, and sequential failure counts of service nodes that are currently
// in "failed" status along with the last time they failed; we retest them first after 10s then
// back off linearly by an additional 10s up to a max testing interval of 2m30s, until we get a
// successful response.
using FailingPK = std::tuple<RouterID, time_point_t, int>;
std::priority_queue<FailingPK, std::vector<FailingPK>, detail::nth_greater<FailingPK, 1>>
failing_queue;
std::unordered_set<RouterID> failing;
// Track the last time *this node* was tested by other network nodes; used to detect and warn
// about possible network issues.
detail::incoming_test_state last;
public:
// If it is time to perform another random test, this returns the next node to test from the
// testing queue and returns it, also updating the timer for the next test. If it is not yet
// time, or if the queue is empty and cannot current be replenished, returns std::nullopt. If
// the queue empties then this builds a new one by shuffling current public keys in the swarm's
// "all nodes" then starts using the new queue for this an subsequent calls.
//
// `requeue` is mainly for internal use: if false it avoids rebuilding the queue if we run
// out (and instead just return nullopt).
std::optional<RouterID>
next_random(
AbstractRouter* router, const time_point_t& now = clock_t::now(), bool requeue = true);
// Removes and returns up to MAX_RETESTS_PER_TICK nodes that are due to be tested (i.e.
// next-testing-time <= now). Returns [snrecord, #previous-failures] for each.
std::vector<std::pair<RouterID, int>>
get_failing(const time_point_t& now = clock_t::now());
// Adds a bad node pubkey to the failing list, to be re-tested soon (with a backoff depending on
// `failures`; see TESTING_BACKOFF). `previous_failures` should be the number of previous
// failures *before* this one, i.e. 0 for a random general test; or the failure count returned
// by `get_failing` for repeated failures.
void
add_failing_node(const RouterID& pk, int previous_failures = 0);
/// removes the public key from the failing set
void
remove_node_from_failing(const RouterID& pk);
// Called when this router receives an incomming session
void
incoming_ping(const time_point_t& now = clock_t::now());
// Check whether we received incoming pings recently
void
check_incoming_tests(const time_point_t& now = clock_t::now());
};
} // namespace llarp::consensus

View file

@ -382,7 +382,7 @@ namespace llarp
replies.emplace_back(new GotRouterMessage(requester, txid, {router->rc()}, false));
return;
}
if (not GetRouter()->ConnectionToRouterAllowed(target.as_array()))
if (not GetRouter()->SessionToRouterAllowed(target.as_array()))
{
// explicitly not allowed
replies.emplace_back(new GotRouterMessage(requester, txid, {}, false));

View file

@ -34,7 +34,7 @@ namespace llarp
Key_t peer;
// check if we know this in our nodedb first
if (not dht.GetRouter()->ConnectionToRouterAllowed(targetKey))
if (not dht.GetRouter()->SessionToRouterAllowed(targetKey))
{
// explicitly disallowed by network
replies.emplace_back(new GotRouterMessage(k, txid, {}, false));

View file

@ -124,7 +124,7 @@ namespace llarp
++itr;
}
if (not m_Router->ConnectionToRouterAllowed(*rid))
if (not m_Router->PathToRouterAllowed(*rid))
return false;
ObtainSNodeSession(*rid, [data = payload.copy(), type](auto session) {
@ -150,7 +150,7 @@ namespace llarp
return false;
if (auto* rid = std::get_if<RouterID>(&addr))
{
if (m_SNodeKeys.count(PubKey{*rid}) or m_Router->ConnectionToRouterAllowed(*rid))
if (m_SNodeKeys.count(PubKey{*rid}) or m_Router->PathToRouterAllowed(*rid))
{
ObtainSNodeSession(
*rid, [hook, routerID = *rid](std::shared_ptr<exit::BaseSession> session) {
@ -338,7 +338,7 @@ namespace llarp
void
ExitEndpoint::ObtainSNodeSession(const RouterID& router, exit::SessionReadyFunc obtainCb)
{
if (not m_Router->rcLookupHandler().RemoteIsAllowed(router))
if (not m_Router->rcLookupHandler().SessionIsAllowed(router))
{
obtainCb(nullptr);
return;

View file

@ -35,6 +35,12 @@ namespace llarp
virtual bool
HasSessionTo(const RouterID& remote) const = 0;
// it is fine to have both an inbound and outbound session with
// another relay, and is useful for network testing. This test
// is more specific for use with "should we connect outbound?"
virtual bool
HasOutboundSessionTo(const RouterID& remote) const = 0;
/// return true if the session with this pubkey is a client
/// return false if the session with this pubkey is a router
/// return std::nullopt we have no session with this pubkey

View file

@ -60,6 +60,17 @@ namespace llarp
return GetLinkWithSessionTo(remote) != nullptr;
}
bool
LinkManager::HasOutboundSessionTo(const RouterID& remote) const
{
for (const auto& link : outboundLinks)
{
if (link->HasSessionTo(remote))
return true;
}
return false;
}
std::optional<bool>
LinkManager::SessionIsClient(RouterID remote) const
{
@ -69,11 +80,8 @@ namespace llarp
if (session)
return not session->IsRelay();
}
for (const auto& link : outboundLinks)
{
if (link->HasSessionTo(remote))
return false;
}
if (HasOutboundSessionTo(remote))
return false;
return std::nullopt;
}

View file

@ -33,6 +33,9 @@ namespace llarp
bool
HasSessionTo(const RouterID& remote) const override;
bool
HasOutboundSessionTo(const RouterID& remote) const override;
std::optional<bool>
SessionIsClient(RouterID remote) const override;

View file

@ -279,7 +279,7 @@ namespace llarp
#endif
}
if (!self->context->Router()->ConnectionToRouterAllowed(self->hop->info.upstream))
if (not self->context->Router()->PathToRouterAllowed(self->hop->info.upstream))
{
// we are not allowed to forward it ... now what?
llarp::LogError(

View file

@ -12,6 +12,7 @@
#include <llarp/router_contact.hpp>
#include <llarp/tooling/router_event.hpp>
#include <llarp/peerstats/peer_db.hpp>
#include <llarp/consensus/reachability_testing.hpp>
#include <optional>
@ -283,14 +284,21 @@ namespace llarp
/// set router's service node whitelist
virtual void
SetRouterWhitelist(const std::vector<RouterID> routers) = 0;
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) = 0;
virtual std::unordered_set<RouterID>
GetRouterWhitelist() const = 0;
/// visit each connected link session
virtual void
ForEachPeer(std::function<void(const ILinkSession*, bool)> visit, bool randomize) const = 0;
virtual bool
ConnectionToRouterAllowed(const RouterID& router) const = 0;
SessionToRouterAllowed(const RouterID& router) const = 0;
virtual bool
PathToRouterAllowed(const RouterID& router) const = 0;
/// return true if we have an exit as a client
virtual bool

View file

@ -33,13 +33,20 @@ namespace llarp
RemoveValidRouter(const RouterID& router) = 0;
virtual void
SetRouterWhitelist(const std::vector<RouterID>& routers) = 0;
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) = 0;
virtual void
GetRC(const RouterID& router, RCRequestCallback callback, bool forceLookup = false) = 0;
virtual bool
RemoteIsAllowed(const RouterID& remote) const = 0;
PathIsAllowed(const RouterID& remote) const = 0;
virtual bool
SessionIsAllowed(const RouterID& remote) const = 0;
virtual bool
IsGreylisted(const RouterID& remote) const = 0;
virtual bool
CheckRC(const RouterContact& rc) const = 0;

View file

@ -26,12 +26,11 @@ namespace llarp
const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->RemoteIsAllowed(remote))
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->SessionIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
return true;
}
const uint16_t priority = msg.Priority();
std::array<byte_t, MAX_LINK_MSG_SIZE> linkmsg_buffer;
llarp_buffer_t buf(linkmsg_buffer);

View file

@ -13,6 +13,8 @@
#include <llarp/crypto/crypto.hpp>
#include <utility>
#include <llarp/rpc/lokid_rpc_client.hpp>
namespace llarp
{
struct PendingSession
@ -35,18 +37,18 @@ namespace llarp
// TODO: do we want to keep it
const auto router = RouterID(session->GetPubKey());
const bool isOutbound = not session->IsInbound();
const std::string remoteType = session->GetRemoteRC().IsPublicRouter() ? "router" : "client";
LogInfo("session with ", remoteType, " [", router, "] established");
LogInfo(
"session with ", remoteType, " [", router, "] ", isOutbound ? "established" : "received");
if (not _rcLookup->RemoteIsAllowed(router))
if (not _rcLookup->SessionIsAllowed(router))
{
FinalizeRequest(router, SessionResult::InvalidRouter);
return false;
}
auto func = std::bind(&OutboundSessionMaker::VerifyRC, this, session->GetRemoteRC());
work(func);
work([this, rc = session->GetRemoteRC()] { VerifyRC(rc); });
return true;
}
@ -54,13 +56,9 @@ namespace llarp
void
OutboundSessionMaker::OnConnectTimeout(ILinkSession* session)
{
// TODO: retry/num attempts
LogWarn(
"Session establish attempt to ",
RouterID(session->GetPubKey()),
" timed out.",
session->GetRemoteEndpoint());
FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
const auto router = RouterID(session->GetPubKey());
LogWarn("Session establish attempt to ", router, " timed out.", session->GetRemoteEndpoint());
FinalizeRequest(router, SessionResult::Timeout);
}
void
@ -76,6 +74,7 @@ namespace llarp
if (HavePendingSessionTo(router))
{
LogDebug("has pending session to", router);
return;
}
@ -133,7 +132,7 @@ namespace llarp
break;
exclude.insert(other.pubkey);
if (not _rcLookup->RemoteIsAllowed(other.pubkey))
if (not _rcLookup->SessionIsAllowed(other.pubkey))
{
continue;
}
@ -227,12 +226,20 @@ namespace llarp
{
_loop->call([this, router] { DoEstablish(router); });
}
else if (_linkManager->HasSessionTo(router))
{
FinalizeRequest(router, SessionResult::Establish);
}
else
{
FinalizeRequest(router, SessionResult::NoLink);
}
}
bool
OutboundSessionMaker::ShouldConnectTo(const RouterID& router) const
{
if (router == us or not _rcLookup->RemoteIsAllowed(router))
if (router == us or not _rcLookup->SessionIsAllowed(router))
return false;
size_t numPending = 0;
{
@ -240,8 +247,10 @@ namespace llarp
if (pendingSessions.find(router) == pendingSessions.end())
numPending += pendingSessions.size();
}
if (_linkManager->HasSessionTo(router))
if (_linkManager->HasOutboundSessionTo(router))
return false;
if (_router->IsServiceNode())
return true;
return _linkManager->NumberOfConnectedRouters() + numPending < maxConnectedRouters;
}
@ -263,6 +272,7 @@ namespace llarp
{
if (not HavePendingSessionTo(router))
{
LogError("no pending session to ", router);
return;
}
@ -276,6 +286,7 @@ namespace llarp
else
{
LogError("RCRequestResult::Success but null rc pointer given");
InvalidRouter(router);
}
break;
case RCRequestResult::InvalidRouter:
@ -285,6 +296,7 @@ namespace llarp
RouterNotFound(router);
break;
default:
RouterNotFound(router);
break;
}
}

View file

@ -33,17 +33,23 @@ namespace llarp
}
void
RCLookupHandler::SetRouterWhitelist(const std::vector<RouterID>& routers)
RCLookupHandler::SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist)
{
if (routers.empty())
if (whitelist.empty())
return;
util::Lock l(_mutex);
whitelistRouters.clear();
for (auto& router : routers)
greylistRouters.clear();
for (auto& router : whitelist)
{
whitelistRouters.emplace(router);
}
for (auto& router : greylist)
{
greylistRouters.emplace(router);
}
LogInfo("lokinet service node list now has ", whitelistRouters.size(), " routers");
}
@ -119,7 +125,24 @@ namespace llarp
}
bool
RCLookupHandler::RemoteIsAllowed(const RouterID& remote) const
RCLookupHandler::IsGreylisted(const RouterID& remote) const
{
if (_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
{
return false;
}
if (not useWhitelist)
return false;
util::Lock lock{_mutex};
return greylistRouters.count(remote);
}
bool
RCLookupHandler::PathIsAllowed(const RouterID& remote) const
{
if (_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
@ -135,10 +158,27 @@ namespace llarp
return whitelistRouters.count(remote);
}
bool
RCLookupHandler::SessionIsAllowed(const RouterID& remote) const
{
if (_strictConnectPubkeys.size() && _strictConnectPubkeys.count(remote) == 0
&& !RemoteInBootstrap(remote))
{
return false;
}
if (not useWhitelist)
return true;
util::Lock lock{_mutex};
return whitelistRouters.count(remote) or greylistRouters.count(remote);
}
bool
RCLookupHandler::CheckRC(const RouterContact& rc) const
{
if (not RemoteIsAllowed(rc.pubkey))
if (not SessionIsAllowed(rc.pubkey))
{
_dht->impl->DelRCNodeAsync(dht::Key_t{rc.pubkey});
return false;
@ -189,7 +229,7 @@ namespace llarp
if (newrc.pubkey != oldrc.pubkey)
return false;
if (!RemoteIsAllowed(newrc.pubkey))
if (!SessionIsAllowed(newrc.pubkey))
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
@ -332,7 +372,7 @@ namespace llarp
return;
}
if (not RemoteIsAllowed(remote))
if (not SessionIsAllowed(remote))
{
FinalizeRequest(remote, &results[0], RCRequestResult::InvalidRouter);
return;

View file

@ -41,7 +41,9 @@ namespace llarp
RemoveValidRouter(const RouterID& router) override EXCLUDES(_mutex);
void
SetRouterWhitelist(const std::vector<RouterID>& routers) override EXCLUDES(_mutex);
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) override
EXCLUDES(_mutex);
bool
HaveReceivedWhitelist() const override;
@ -51,7 +53,13 @@ namespace llarp
EXCLUDES(_mutex);
bool
RemoteIsAllowed(const RouterID& remote) const override EXCLUDES(_mutex);
PathIsAllowed(const RouterID& remote) const override EXCLUDES(_mutex);
bool
SessionIsAllowed(const RouterID& remote) const override EXCLUDES(_mutex);
bool
IsGreylisted(const RouterID& remote) const override EXCLUDES(_mutex);
bool
CheckRC(const RouterContact& rc) const override;
@ -84,6 +92,13 @@ namespace llarp
bool useWhitelist_arg,
bool isServiceNode_arg);
std::unordered_set<RouterID>
Whitelist() const
{
util::Lock lock{_mutex};
return whitelistRouters;
}
private:
void
HandleDHTLookupResult(RouterID remote, const std::vector<RouterContact>& results);
@ -120,6 +135,7 @@ namespace llarp
bool isServiceNode = false;
std::unordered_set<RouterID> whitelistRouters GUARDED_BY(_mutex);
std::unordered_set<RouterID> greylistRouters GUARDED_BY(_mutex);
using TimePoint = std::chrono::steady_clock::time_point;
std::unordered_map<RouterID, TimePoint> _routerLookupTimes;

View file

@ -63,7 +63,6 @@ namespace llarp
#else
, _randomStartDelay(std::chrono::seconds((llarp::randint() % 30) + 10))
#endif
, m_lokidRpcClient(std::make_shared<rpc::LokidRpcClient>(m_lmq, this))
{
m_keyManager = std::make_shared<KeyManager>();
// for lokid, so we don't close the connection when syncing the whitelist
@ -290,7 +289,10 @@ namespace llarp
auto& conf = *m_Config;
whitelistRouters = conf.lokid.whitelistRouters;
if (whitelistRouters)
{
lokidRPCAddr = oxenmq::address(conf.lokid.lokidRPCAddr);
m_lokidRpcClient = std::make_shared<rpc::LokidRpcClient>(m_lmq, weak_from_this());
}
enableRPCServer = conf.api.m_enableRPCServer;
if (enableRPCServer)
@ -375,21 +377,27 @@ namespace llarp
}
bool
Router::LooksDeregistered() const
Router::LooksDecommissioned() const
{
return IsServiceNode() and whitelistRouters and _rcLookupHandler.HaveReceivedWhitelist()
and not _rcLookupHandler.RemoteIsAllowed(pubkey());
and _rcLookupHandler.IsGreylisted(pubkey());
}
bool
Router::ConnectionToRouterAllowed(const RouterID& router) const
Router::SessionToRouterAllowed(const RouterID& router) const
{
if (LooksDeregistered())
return _rcLookupHandler.SessionIsAllowed(router);
}
bool
Router::PathToRouterAllowed(const RouterID& router) const
{
if (LooksDecommissioned())
{
// we are deregistered don't allow any connections outbound at all
// we are decom'd don't allow any paths outbound at all
return false;
}
return _rcLookupHandler.RemoteIsAllowed(router);
return _rcLookupHandler.PathIsAllowed(router);
}
size_t
@ -748,7 +756,7 @@ namespace llarp
ss << " snode | known/svc/clients: " << nodedb()->NumLoaded() << "/"
<< NumberOfConnectedRouters() << "/" << NumberOfConnectedClients() << " | "
<< pathContext().CurrentTransitPaths() << " active paths | "
<< "block " << m_lokidRpcClient->BlockHeight();
<< "block " << (m_lokidRpcClient ? m_lokidRpcClient->BlockHeight() : 0);
}
else
{
@ -780,7 +788,7 @@ namespace llarp
const bool gotWhitelist = _rcLookupHandler.HaveReceivedWhitelist();
const bool isSvcNode = IsServiceNode();
const bool looksDeregistered = LooksDeregistered();
const bool decom = LooksDecommissioned();
if (_rc.ExpiresSoon(now, std::chrono::milliseconds(randint() % 10000))
|| (now - _rc.last_updated) > rcRegenInterval)
@ -789,8 +797,10 @@ namespace llarp
if (!UpdateOurRC(false))
LogError("Failed to update our RC");
}
else if (not looksDeregistered)
else if (whitelistRouters and gotWhitelist and _rcLookupHandler.SessionIsAllowed(pubkey()))
{
// if we have the whitelist enabled, we have fetched the list and we are in either
// the white or grey list, we want to gossip our RC
GossipRCIfNeeded(_rc);
}
// remove RCs for nodes that are no longer allowed by network policy
@ -815,7 +825,7 @@ namespace llarp
// the whitelist enabled and we got the whitelist
// check against the whitelist and remove if it's not
// in the whitelist OR if there is no whitelist don't remove
return not _rcLookupHandler.RemoteIsAllowed(rc.pubkey);
return not _rcLookupHandler.SessionIsAllowed(rc.pubkey);
});
// find all deregistered relays
@ -827,7 +837,7 @@ namespace llarp
if (not session)
return;
const auto pk = session->GetPubKey();
if (session->IsRelay() and not _rcLookupHandler.RemoteIsAllowed(pk))
if (session->IsRelay() and not _rcLookupHandler.SessionIsAllowed(pk))
{
closePeers.emplace(pk);
}
@ -860,7 +870,7 @@ namespace llarp
const int interval = isSvcNode ? 5 : 2;
const auto timepoint_now = Clock_t::now();
if (timepoint_now >= m_NextExploreAt and not looksDeregistered)
if (timepoint_now >= m_NextExploreAt and not decom)
{
_rcLookupHandler.ExploreNetwork();
m_NextExploreAt = timepoint_now + std::chrono::seconds(interval);
@ -872,13 +882,8 @@ namespace llarp
connectToNum = strictConnect;
}
if (looksDeregistered)
if (decom)
{
// kill all sessions that are open because we think we are deregistered
_linkManager.ForEachPeer([](auto* peer) {
if (peer)
peer->Close();
});
// complain about being deregistered
LogError("We are running as a service node but we seem to be decommissioned");
}
@ -1032,9 +1037,10 @@ namespace llarp
}
void
Router::SetRouterWhitelist(const std::vector<RouterID> routers)
Router::SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist)
{
_rcLookupHandler.SetRouterWhitelist(routers);
_rcLookupHandler.SetRouterWhitelist(whitelist, greylist);
}
bool
@ -1181,6 +1187,75 @@ namespace llarp
#if defined(WITH_SYSTEMD)
::sd_notify(0, "READY=1");
#endif
if (whitelistRouters)
{
// do service node testing if we are in service node whitelist mode
_loop->call_every(consensus::REACHABILITY_TESTING_TIMER_INTERVAL, weak_from_this(), [this] {
// dont run tests if we are not running or we are stopping
if (not _running)
return;
// dont run tests if we are decommissioned
if (LooksDecommissioned())
return;
auto tests = m_routerTesting.get_failing();
if (auto maybe = m_routerTesting.next_random(this))
{
tests.emplace_back(*maybe, 0);
}
for (const auto& [router, fails] : tests)
{
if (not SessionToRouterAllowed(router))
{
LogDebug(
router,
" is no longer a registered service node so we remove it from the testing list");
m_routerTesting.remove_node_from_failing(router);
continue;
}
LogDebug("Establishing session to ", router, " for SN testing");
// try to make a session to this random router
// this will do a dht lookup if needed
_outboundSessionMaker.CreateSessionTo(
router, [previous_fails = fails, this](const auto& router, const auto result) {
auto rpc = RpcClient();
if (result != SessionResult::Establish)
{
// failed connection mark it as so
m_routerTesting.add_failing_node(router, previous_fails);
LogInfo(
"FAILED SN connection test to ",
router,
" (",
previous_fails + 1,
" consecutive failures)");
}
else
{
m_routerTesting.remove_node_from_failing(router);
if (previous_fails > 0)
{
LogInfo(
"Successful SN connection test to ",
router,
" after ",
previous_fails,
" failures");
}
else
{
LogDebug("Successful SN connection test to ", router);
}
}
if (rpc)
{
// inform as needed
rpc->InformConnection(router, result == SessionResult::Establish);
}
});
}
});
}
LogContext::Instance().DropToRuntimeLevel();
return _running;
}
@ -1314,7 +1389,7 @@ namespace llarp
return false;
}
if (!_rcLookupHandler.RemoteIsAllowed(rc.pubkey))
if (not _rcLookupHandler.SessionIsAllowed(rc.pubkey))
{
return false;
}

View file

@ -131,7 +131,14 @@ namespace llarp
ModifyOurRC(std::function<std::optional<RouterContact>(RouterContact)> modify) override;
void
SetRouterWhitelist(const std::vector<RouterID> routers) override;
SetRouterWhitelist(
const std::vector<RouterID>& whitelist, const std::vector<RouterID>& greylist) override;
std::unordered_set<RouterID>
GetRouterWhitelist() const override
{
return _rcLookupHandler.Whitelist();
}
exit::Context&
exitContext() override
@ -181,9 +188,9 @@ namespace llarp
void
QueueDiskIO(std::function<void(void)> func) override;
/// return true if we look like we are a deregistered service node
/// return true if we look like we are a decommissioned service node
bool
LooksDeregistered() const;
LooksDecommissioned() const;
std::optional<SockAddr> _ourAddress;
@ -392,7 +399,9 @@ namespace llarp
EnsureEncryptionKey();
bool
ConnectionToRouterAllowed(const RouterID& router) const override;
SessionToRouterAllowed(const RouterID& router) const override;
bool
PathToRouterAllowed(const RouterID& router) const override;
void
HandleSaveRC() const;
@ -536,6 +545,8 @@ namespace llarp
uint32_t path_build_count = 0;
consensus::reachability_testing m_routerTesting;
bool
ShouldReportStats(llarp_time_t now) const;

View file

@ -32,8 +32,8 @@ namespace llarp
}
}
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r)
: m_lokiMQ(std::move(lmq)), m_Router(r)
LokidRpcClient::LokidRpcClient(LMQ_ptr lmq, std::weak_ptr<AbstractRouter> r)
: m_lokiMQ{std::move(lmq)}, m_Router{std::move(r)}
{
// m_lokiMQ->log_level(toLokiMQLogLevel(LogLevel::Instance().curLevel));
@ -51,18 +51,24 @@ namespace llarp
void
LokidRpcClient::ConnectAsync(oxenmq::address url)
{
if (not m_Router->IsServiceNode())
if (auto router = m_Router.lock())
{
throw std::runtime_error("we cannot talk to lokid while not a service node");
if (not router->IsServiceNode())
{
throw std::runtime_error("we cannot talk to lokid while not a service node");
}
LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
if (auto router = self->m_Router.lock())
{
router->loop()->call([self, url]() { self->ConnectAsync(url); });
}
});
}
LogInfo("connecting to lokid via LMQ at ", url);
m_Connection = m_lokiMQ->connect_remote(
url,
[self = shared_from_this()](oxenmq::ConnectionID) { self->Connected(); },
[self = shared_from_this(), url](oxenmq::ConnectionID, std::string_view f) {
llarp::LogWarn("Failed to connect to lokid: ", f);
self->m_Router->loop()->call([self, url]() { self->ConnectAsync(url); });
});
}
void
@ -89,25 +95,25 @@ namespace llarp
}
catch (std::exception& ex)
{
LogError("bad block hieght: ", ex.what());
LogError("bad block height: ", ex.what());
return; // bail
}
LogDebug("new block at hieght ", m_BlockHeight);
LogDebug("new block at height ", m_BlockHeight);
// don't upadate on block notification if an update is pending
if (not m_UpdatingList)
UpdateServiceNodeList(std::string{msg.data[1]});
UpdateServiceNodeList();
}
void
LokidRpcClient::UpdateServiceNodeList(std::string topblock)
LokidRpcClient::UpdateServiceNodeList()
{
nlohmann::json request, fields;
fields["pubkey_ed25519"] = true;
fields["service_node_pubkey"] = true;
fields["funded"] = true;
fields["active"] = true;
request["fields"] = fields;
request["active_only"] = true;
if (not topblock.empty())
request["poll_block_hash"] = topblock;
m_UpdatingList = true;
Request(
"rpc.get_service_nodes",
@ -161,7 +167,7 @@ namespace llarp
};
m_lokiMQ->add_timer(makePingRequest, PingInterval);
// initial fetch of service node list
UpdateServiceNodeList("");
UpdateServiceNodeList();
}
void
@ -179,8 +185,8 @@ namespace llarp
}
}
}
std::vector<RouterID> nodeList;
std::unordered_map<RouterID, PubKey> keymap;
std::vector<RouterID> activeNodeList, nonActiveNodeList;
{
const auto itr = j.find("service_node_states");
if (itr != j.end() and itr->is_array())
@ -190,22 +196,81 @@ namespace llarp
const auto ed_itr = j_itr->find("pubkey_ed25519");
if (ed_itr == j_itr->end() or not ed_itr->is_string())
continue;
const auto svc_itr = j_itr->find("service_node_pubkey");
if (svc_itr == j_itr->end() or not svc_itr->is_string())
continue;
const auto funded_itr = j_itr->find("funded");
if (funded_itr == j_itr->end() or not funded_itr->is_boolean())
continue;
const auto active_itr = j_itr->find("active");
if (active_itr == j_itr->end() or not active_itr->is_boolean())
continue;
const bool active = active_itr->get<bool>();
const bool funded = funded_itr->get<bool>();
if (not funded)
continue;
RouterID rid;
if (rid.FromHex(ed_itr->get<std::string>()))
nodeList.emplace_back(std::move(rid));
PubKey pk;
if (rid.FromHex(ed_itr->get<std::string>()) and pk.FromHex(svc_itr->get<std::string>()))
{
keymap[rid] = pk;
if (active)
activeNodeList.emplace_back(std::move(rid));
else
nonActiveNodeList.emplace_back(std::move(rid));
}
}
}
}
if (nodeList.empty())
if (activeNodeList.empty())
{
LogWarn("got empty service node list, ignoring.");
return;
}
// inform router about the new list
m_Router->loop()->call([r = m_Router, nodeList = std::move(nodeList)]() mutable {
r->SetRouterWhitelist(std::move(nodeList));
});
if (auto router = m_Router.lock())
{
auto& loop = router->loop();
loop->call([this,
active = std::move(activeNodeList),
inactive = std::move(nonActiveNodeList),
keymap = std::move(keymap),
router = std::move(router)]() mutable {
m_KeyMap = std::move(keymap);
router->SetRouterWhitelist(active, inactive);
});
}
else
LogWarn("Cannot update whitelist: router object has gone away");
}
void
LokidRpcClient::InformConnection(RouterID router, bool success)
{
if (auto r = m_Router.lock())
{
r->loop()->call([router, success, this]() {
if (auto itr = m_KeyMap.find(router); itr != m_KeyMap.end())
{
const nlohmann::json request = {
{"passed", success}, {"pubkey", itr->second.ToHex()}, {"type", "lokinet"}};
Request(
"admin.report_peer_status",
[self = shared_from_this()](bool success, std::vector<std::string>) {
if (not success)
{
LogError("Failed to report connection status to oxend");
return;
}
LogDebug("reported connection status to core");
},
request.dump());
}
});
}
}
SecretKey
@ -261,7 +326,7 @@ namespace llarp
const nlohmann::json req{{"type", 2}, {"name_hash", namehash.ToHex()}};
Request(
"rpc.lns_resolve",
[r = m_Router, resultHandler](bool success, std::vector<std::string> data) {
[this, resultHandler](bool success, std::vector<std::string> data) {
std::optional<service::EncryptedName> maybe = std::nullopt;
if (success)
{
@ -285,8 +350,11 @@ namespace llarp
LogError("failed to parse response from lns lookup: ", ex.what());
}
}
r->loop()->call(
[resultHandler, maybe = std::move(maybe)]() { resultHandler(std::move(maybe)); });
if (auto r = m_Router.lock())
{
r->loop()->call(
[resultHandler, maybe = std::move(maybe)]() { resultHandler(std::move(maybe)); });
}
},
req.dump());
}
@ -300,65 +368,66 @@ namespace llarp
LogInfo(" :", str);
}
assert(m_Router != nullptr);
if (not m_Router->peerDb())
if (auto router = m_Router.lock())
{
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
try
{
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
if (not router->peerDb())
{
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
LogWarn("HandleGetPeerStats called when router has no peerDb set up.");
// TODO: this can sometimes occur if lokid hits our API before we're done configuring
// (mostly an issue in a loopback testnet)
msg.send_reply("EAGAIN");
return;
}
std::vector<std::string> routerIdStrings;
oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
for (const auto& routerIdString : routerIdStrings)
try
{
RouterID id;
if (not id.FromString(routerIdString))
// msg.data[0] is expected to contain a bt list of router ids (in our preferred string
// format)
if (msg.data.empty())
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
LogWarn("lokid requested peer stats with no request body");
msg.send_reply("peer stats request requires list of router IDs");
return;
}
routerIds.push_back(std::move(id));
std::vector<std::string> routerIdStrings;
oxenmq::bt_deserialize(msg.data[0], routerIdStrings);
std::vector<RouterID> routerIds;
routerIds.reserve(routerIdStrings.size());
for (const auto& routerIdString : routerIdStrings)
{
RouterID id;
if (not id.FromString(routerIdString))
{
LogWarn("lokid sent us an invalid router id: ", routerIdString);
msg.send_reply("Invalid router id");
return;
}
routerIds.push_back(std::move(id));
}
auto statsList = router->peerDb()->listPeerStats(routerIds);
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
PeerStats::BEncodeList(statsList, &llarpBuf);
msg.send_reply(
std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
}
auto statsList = m_Router->peerDb()->listPeerStats(routerIds);
int32_t bufSize =
256 + (statsList.size() * 1024); // TODO: tune this or allow to grow dynamically
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[bufSize]);
llarp_buffer_t llarpBuf(buf.get(), bufSize);
PeerStats::BEncodeList(statsList, &llarpBuf);
msg.send_reply(std::string_view((const char*)llarpBuf.base, llarpBuf.cur - llarpBuf.base));
}
catch (const std::exception& e)
{
LogError("Failed to handle get_peer_stats request: ", e.what());
msg.send_reply("server error");
}
}
} // namespace rpc
} // namespace llarp

View file

@ -19,7 +19,7 @@ namespace llarp
/// The LokidRpcClient uses loki-mq to talk to make API requests to lokid.
struct LokidRpcClient : public std::enable_shared_from_this<LokidRpcClient>
{
explicit LokidRpcClient(LMQ_ptr lmq, AbstractRouter* r);
explicit LokidRpcClient(LMQ_ptr lmq, std::weak_ptr<AbstractRouter> r);
/// Connect to lokid async
void
@ -42,6 +42,10 @@ namespace llarp
dht::Key_t namehash,
std::function<void(std::optional<service::EncryptedName>)> resultHandler);
/// inform that if connected to a router successfully
void
InformConnection(RouterID router, bool success);
private:
/// called when we have connected to lokid via lokimq
void
@ -52,7 +56,7 @@ namespace llarp
Command(std::string_view cmd);
void
UpdateServiceNodeList(std::string topblock);
UpdateServiceNodeList();
template <typename HandlerFunc_t, typename Args_t>
void
@ -82,9 +86,11 @@ namespace llarp
std::optional<oxenmq::ConnectionID> m_Connection;
LMQ_ptr m_lokiMQ;
AbstractRouter* const m_Router;
std::weak_ptr<AbstractRouter> m_Router;
std::atomic<bool> m_UpdatingList;
std::unordered_map<RouterID, PubKey> m_KeyMap;
uint64_t m_BlockHeight;
};