more hive things

DHT PubIntroSentEvent
some helper functions added to RouterHive (C++ class) as well as RouterHive(Python class)
hive.py main() continues to be a testbed for new event types
some more internal classes in pybind
This commit is contained in:
Thomas Winget 2020-03-04 00:57:07 -05:00
parent 6d472d2423
commit 695784b2b6
14 changed files with 196 additions and 7 deletions

View File

@ -2,6 +2,7 @@
#include <service/endpoint.hpp>
#include <dht/context.hpp>
#include <dht/key.hpp>
#include <dht/messages/findintro.hpp>
#include <dht/messages/findrouter.hpp>
#include <dht/messages/gotintro.hpp>
@ -23,6 +24,7 @@
#include <util/meta/memfn.hpp>
#include <hook/shell.hpp>
#include <link/link_manager.hpp>
#include <tooling/dht_event.hpp>
#include <utility>
@ -470,10 +472,13 @@ namespace llarp
// do publishing for each path selected
size_t published = 0;
for(const auto& path : paths)
{
for(size_t i = 0; i < llarp::dht::IntroSetRequestsPerRelay; ++i)
{
auto ev = std::make_unique<tooling::PubIntroSentEvent>(r->pubkey(), llarp::dht::Key_t{introset.derivedSigningKey.as_array()}, RouterID(path->hops[path->hops.size()-1].rc.pubkey), published);
r->NotifyRouterEvent(std::move(ev));
if(PublishIntroSetVia(introset, r, path, published))
published++;
}

View File

@ -5,13 +5,27 @@
namespace tooling
{
PubIntroReceivedEvent::PubIntroReceivedEvent(const llarp::RouterID & ourRouter, const llarp::dht::Key_t & from, const llarp::dht::Key_t & location, uint64_t txid, uint64_t relayOrder) :
RouterEvent("DHT: PubIntroReceivedEvent", ourRouter, false),
RouterEvent("DHT: PubIntroReceivedEvent", ourRouter, true),
From(from),
IntrosetLocation(location),
RelayOrder(relayOrder),
TxID(txid)
{}
PubIntroSentEvent::PubIntroSentEvent(const llarp::RouterID & ourRouter, const llarp::dht::Key_t & introsetPubkey, const llarp::RouterID& relay, uint64_t relayIndex)
: RouterEvent("DHT: PubIntroSentEvent", ourRouter, false)
, introsetPubkey(introsetPubkey)
, relay(relay)
, relayIndex(relayIndex)
{
}
std::string
PubIntroSentEvent::ToString() const
{
return RouterEvent::ToString() + " ---- introset pubkey: " + introsetPubkey.ShortHex() + ", relay: " + relay.ShortString() + ", relayIndex: " + std::to_string(relayIndex);
}
std::string PubIntroReceivedEvent::ToString() const
{
return RouterEvent::ToString() + "from " + From.ShortHex() + " location=" + IntrosetLocation.ShortHex() + " order=" + std::to_string(RelayOrder) + " txid=" + std::to_string(TxID);

View File

@ -7,6 +7,19 @@
namespace tooling
{
struct PubIntroSentEvent : public RouterEvent
{
PubIntroSentEvent(const llarp::RouterID & ourRouter, const llarp::dht::Key_t & introsetPubkey, const llarp::RouterID& relay, uint64_t relayIndex);
llarp::dht::Key_t introsetPubkey;
llarp::RouterID relay;
uint64_t relayIndex;
std::string ToString() const override;
};
struct PubIntroReceivedEvent : public RouterEvent
{
PubIntroReceivedEvent(const llarp::RouterID & ourRouter, const llarp::dht::Key_t & from, const llarp::dht::Key_t & location, uint64_t txid, uint64_t relayOrder);

View File

@ -3,8 +3,10 @@
#include "llarp.h"
#include "llarp.hpp"
#include "util/thread/logic.hpp"
#include "router/abstractrouter.hpp"
#include <chrono>
#include <algorithm>
using namespace std::chrono_literals;
@ -166,4 +168,76 @@ namespace tooling
VisitRouter(clients[index], visit);
}
std::vector<size_t>
RouterHive::RelayConnectedRelays()
{
std::vector<size_t> results;
results.resize(relays.size());
std::mutex results_lock;
size_t i=0;
size_t done_count = 0;
for (auto relay : relays)
{
auto ctx = llarp::Context::Get(relay);
LogicCall(ctx->logic, [&, i, ctx]() {
size_t count = ctx->router->NumberOfConnectedRouters();
std::lock_guard<std::mutex> guard{results_lock};
results[i] = count;
done_count++;
});
i++;
}
while (true)
{
size_t read_done_count = 0;
{
std::lock_guard<std::mutex> guard{results_lock};
read_done_count = done_count;
}
if (read_done_count == relays.size())
break;
std::this_thread::sleep_for(100ms);
}
return results;
}
std::vector<llarp::RouterContact>
RouterHive::GetRelayRCs()
{
std::vector<llarp::RouterContact> results;
results.resize(relays.size());
std::mutex results_lock;
size_t i=0;
size_t done_count = 0;
for (auto relay : relays)
{
auto ctx = llarp::Context::Get(relay);
LogicCall(ctx->logic, [&, i, ctx]() {
llarp::RouterContact rc = ctx->router->rc();
std::lock_guard<std::mutex> guard{results_lock};
results[i] = std::move(rc);
done_count++;
});
i++;
}
while (true)
{
size_t read_done_count = 0;
{
std::lock_guard<std::mutex> guard{results_lock};
read_done_count = done_count;
}
if (read_done_count == relays.size())
break;
std::this_thread::sleep_for(100ms);
}
return results;
}
} // namespace tooling

View File

@ -98,6 +98,12 @@ namespace tooling
ForEachClient(visit);
}
std::vector<size_t>
RelayConnectedRelays();
std::vector<llarp::RouterContact>
GetRelayRCs();
std::vector<llarp_main *> relays;
std::vector<llarp_main *> clients;

View File

@ -5,6 +5,7 @@ set(LLARP_PYBIND_SRC
llarp/router_contact.cpp
llarp/crypto/types.cpp
llarp/config.cpp
llarp/dht/dht_types.cpp
llarp/path/path_types.cpp
llarp/path/path_hop_config.cpp
llarp/handlers/pyhandler.cpp

View File

@ -52,6 +52,12 @@ namespace llarp
void
PathTypes_Init(py::module & mod);
namespace dht
{
void
DHTTypes_Init(py::module& mod);
}
namespace path
{
void

View File

@ -0,0 +1,26 @@
#include <dht/key.hpp>
#include "common.hpp"
#include <pybind11/operators.h>
namespace llarp
{
namespace dht
{
void
DHTTypes_Init(py::module& mod)
{
py::class_< Key_t >(mod, "DHTKey")
.def(py::self == py::self)
.def(py::self < py::self)
.def(py::self ^ py::self)
.def("distance", [](const Key_t* const lhs, const Key_t* const rhs) {
return *lhs ^ *rhs;
})
.def("ShortString", [](const Key_t* const key) {
return llarp::RouterID(key->as_array()).ShortString();
});
}
} // namespace llarp::dht
} // namespace llarp

View File

@ -1,5 +1,7 @@
#include <path/path.hpp>
#include "common.hpp"
#include <pybind11/operators.h>
namespace llarp
{
@ -7,9 +9,7 @@ namespace llarp
PathTypes_Init(py::module & mod)
{
py::class_< PathID_t >(mod, "PathID")
.def("__eq__", [](const PathID_t* const lhs, const PathID_t* const rhs) {
return *lhs == *rhs;
})
.def(py::self == py::self)
.def("ShortHex", &PathID_t::ShortHex)
.def("__str__", &PathID_t::ShortHex)
.def("__repr__", &PathID_t::ShortHex);

View File

@ -1,4 +1,5 @@
#include <router_contact.hpp>
#include <dht/key.hpp>
#include "common.hpp"
namespace llarp
@ -11,6 +12,9 @@ namespace llarp
.def_property_readonly("routerID", [](const RouterContact* const rc) -> llarp::RouterID {
return llarp::RouterID(rc->pubkey);
})
.def_property_readonly("AsDHTKey", [](const RouterContact* const rc) -> llarp::dht::Key_t {
return llarp::dht::Key_t{rc->pubkey.as_array()};
})
.def("ReadFile", &RouterContact::Read)
.def("WriteFile", &RouterContact::Write)
.def("ToString", &RouterContact::ToString)

View File

@ -37,6 +37,11 @@ namespace tooling
return ev->status == llarp::LR_StatusRecord::SUCCESS;
});
py::class_<PubIntroSentEvent, RouterEvent>(mod, "DhtPubIntroSentEvent")
.def_readonly("introsetPubkey", &PubIntroSentEvent::introsetPubkey)
.def_readonly("relay", &PubIntroSentEvent::relay)
.def_readonly("relayIndex", &PubIntroSentEvent::relayIndex);
py::class_<PubIntroReceivedEvent, RouterEvent>(mod, "DhtPubIntroReceivedEvent")
.def_readonly("from", &PubIntroReceivedEvent::From)
.def_readonly("location", &PubIntroReceivedEvent::IntrosetLocation)

View File

@ -21,7 +21,9 @@ namespace tooling
.def("ForEachClient", &RouterHive::ForEachClient)
.def("ForEachRouter", &RouterHive::ForEachRouter)
.def("GetNextEvent", &RouterHive::GetNextEvent)
.def("GetAllEvents", &RouterHive::GetAllEvents);
.def("GetAllEvents", &RouterHive::GetAllEvents)
.def("RelayConnectedRelays", &RouterHive::RelayConnectedRelays)
.def("GetRelayRCs", &RouterHive::GetRelayRCs);
}
}

View File

@ -10,6 +10,7 @@ PYBIND11_MODULE(pyllarp, m)
llarp::CryptoTypes_Init(m);
llarp::Context_Init(m);
llarp::Config_Init(m);
llarp::dht::DHTTypes_Init(m);
llarp::PathTypes_Init(m);
llarp::path::PathHopConfig_Init(m);
llarp::handlers::PyHandler_Init(m);

View File

@ -26,6 +26,7 @@ class RouterHive(object):
self.events = deque()
self.hive = None
self.RCs = []
pyllarp.EnableDebug()
if not self.RemoveTmpDir():
@ -164,8 +165,10 @@ class RouterHive(object):
sleep(0.2)
self.hive.ForEachRelay(lambda r: self.MakeEndpoint(r, self.onGotEndpoint))
print("Sleeping 1 seconds before starting clients")
sleep(1)
print("Sleeping 2 seconds before starting clients")
sleep(2)
self.RCs = self.hive.GetRelayRCs()
self.hive.StartClients()
@ -187,6 +190,17 @@ class RouterHive(object):
return self.events.popleft()
return None
def DistanceSortedRCs(self, dht_key):
rcs = []
distances = []
for rc in self.RCs:
distances.append(rc.AsDHTKey ^ dht_key)
rcs.append(rc)
distances, rcs = (list(t) for t in zip(*sorted(zip(distances, rcs))))
return rcs
def main(n_relays=10, n_clients=10, print_each_event=True):
running = True
@ -206,6 +220,9 @@ def main(n_relays=10, n_clients=10, print_each_event=True):
print(err)
return 1
first_dht_pub = False
dht_pub_sorted = None
dht_pub_location = None
total_events = 0
event_counts = dict()
while running:
@ -231,6 +248,21 @@ def main(n_relays=10, n_clients=10, print_each_event=True):
if total_events % 10 == 0:
pprint(event_counts)
if event_name == "DhtPubIntroReceivedEvent":
if not first_dht_pub:
dht_pub_sorted = hive.DistanceSortedRCs(event.location)
dht_pub_location = event.location
print("location: {} landed at: {}, sorted distance list:".format(dht_pub_location.ShortString(), event.routerID.ShortString()))
print([x.routerID.ShortString() for x in dht_pub_sorted[:4]])
first_dht_pub = True
else:
if event.location == dht_pub_location:
print("location: {}, landed at: {}".format(dht_pub_location.ShortString(), event.routerID.ShortString()))
# won't have printed event count above in this case.
if len(hive.events) == 0:
pprint(event_counts)
hive.events = []
sleep(1)