diff --git a/contrib/py/pyllarp/hive.py b/contrib/py/pyllarp/hive.py index 59eb7ab5c..47fbc5dd9 100644 --- a/contrib/py/pyllarp/hive.py +++ b/contrib/py/pyllarp/hive.py @@ -8,107 +8,189 @@ from socket import AF_INET, htons, inet_aton from pprint import pprint import sys from argparse import ArgumentParser as ap +import threading +from collections import deque -tmpdir = "/tmp/lokinet_hive" +class RouterHive(object): -def RemoveTmpDir(dirname): - if dirname.startswith("/tmp/") and len(dirname) > 5: - print("calling rmdir -r %s" % dirname) - if (input("Is this ok? (y/n): ").lower().strip()[:1] == "y"): - rmtree(dirname, ignore_errors=True) - return True - else: - print("not removing dir %s because it doesn't start with /tmp/" % dirname) - - return False + def __init__(self, n_relays=10, n_clients=10, netid="hive"): -endpointName = "pyllarp" + self.endpointName = "pyllarp" + self.tmpdir = "/tmp/lokinet_hive" + self.netid = netid -def MakeEndpoint(router, after): - if router.IsRelay(): - return - ep = pyllarp.Endpoint(endpointName, router) - router.AddEndpoint(ep) - if after is not None: - router.CallSafe(lambda : after(ep)) + self.n_relays = n_relays + self.n_clients = n_clients -def AddRelay(hive, index, netid="hive"): - dirname = "%s/relays/%d" % (tmpdir, index) - makedirs("%s/netdb" % dirname, exist_ok=True) + self.addrs = [] + self.events = deque() - config = pyllarp.Config() + self.hive = None - port = index + 30000 - tunname = "lokihive%d" % index + pyllarp.EnableDebug() + if not self.RemoveTmpDir(): + raise RuntimeError("Failed to initialize Router Hive") - config.router.encryptionKeyfile = "%s/encryption.key" % dirname - config.router.transportKeyfile = "%s/transport.key" % dirname - config.router.identKeyfile = "%s/identity.key" % dirname - config.router.ourRcFile = "%s/rc.signed" % dirname - config.router.netid = netid - config.router.nickname = "Router%d" % index - config.router.publicOverride = True - config.router.overrideAddress("127.0.0.1", '{}'.format(port)) - """ - config.router.ip4addr.sin_family = AF_INET - config.router.ip4addr.sin_port = htons(port) - config.router.ip4addr.sin_addr.set("127.0.0.1") - """ - config.router.blockBogons = False + def RemoveTmpDir(self): + if self.tmpdir.startswith("/tmp/") and len(self.tmpdir) > 5: + print("calling rmdir -r %s" % self.tmpdir) + if (input("Is this ok? (y/n): ").lower().strip()[:1] == "y"): + rmtree(self.tmpdir, ignore_errors=True) + return True + else: + print("not removing dir %s because it doesn't start with /tmp/" % self.tmpdir) - config.network.enableProfiling = False - config.network.routerProfilesFile = "%s/profiles.dat" % dirname - config.network.netConfig = {"type": "null"} + return False - config.netdb.nodedbDir = "%s/netdb" % dirname + def MakeEndpoint(self, router, after): + if router.IsRelay(): + return + ep = pyllarp.Endpoint(self.endpointName, router) + router.AddEndpoint(ep) + if after is not None: + router.CallSafe(lambda : after(ep)) - config.links.InboundLinks = [("lo", AF_INET, port, set())] + def AddRelay(self, index): + dirname = "%s/relays/%d" % (self.tmpdir, index) + makedirs("%s/netdb" % dirname, exist_ok=True) - config.system.pidfile = "%s/lokinet.pid" % dirname + config = pyllarp.Config() - config.dns.netConfig = {"local-dns": ("127.3.2.1:%d" % port)} + port = index + 30000 + tunname = "lokihive%d" % index - if index != 1: - config.bootstrap.routers = ["%s/relays/1/rc.signed" % tmpdir] + config.router.encryptionKeyfile = "%s/encryption.key" % dirname + config.router.transportKeyfile = "%s/transport.key" % dirname + config.router.identKeyfile = "%s/identity.key" % dirname + config.router.ourRcFile = "%s/rc.signed" % dirname + config.router.netid = self.netid + config.router.nickname = "Router%d" % index + config.router.publicOverride = True + config.router.overrideAddress("127.0.0.1", '{}'.format(port)) + """ + config.router.ip4addr.sin_family = AF_INET + config.router.ip4addr.sin_port = htons(port) + config.router.ip4addr.sin_addr.set("127.0.0.1") + """ + config.router.blockBogons = False - hive.AddRelay(config) + config.network.enableProfiling = False + config.network.routerProfilesFile = "%s/profiles.dat" % dirname + config.network.netConfig = {"type": "null"} + + config.netdb.nodedbDir = "%s/netdb" % dirname + + config.links.InboundLinks = [("lo", AF_INET, port, set())] + + config.system.pidfile = "%s/lokinet.pid" % dirname + + config.dns.netConfig = {"local-dns": ("127.3.2.1:%d" % port)} + + if index != 1: + config.bootstrap.routers = ["%s/relays/1/rc.signed" % self.tmpdir] + + self.hive.AddRelay(config) -def AddClient(hive, index, netid="hive"): - dirname = "%s/clients/%d" % (tmpdir, index) - makedirs("%s/netdb" % dirname, exist_ok=True) + def AddClient(self, index): + dirname = "%s/clients/%d" % (self.tmpdir, index) + makedirs("%s/netdb" % dirname, exist_ok=True) - config = pyllarp.Config() + config = pyllarp.Config() - port = index + 40000 - tunname = "lokihive%d" % index + port = index + 40000 + tunname = "lokihive%d" % index - config.router.encryptionKeyfile = "%s/encryption.key" % dirname - config.router.transportKeyfile = "%s/transport.key" % dirname - config.router.identKeyfile = "%s/identity.key" % dirname - config.router.ourRcFile = "%s/rc.signed" % dirname - config.router.netid = netid - config.router.blockBogons = False + config.router.encryptionKeyfile = "%s/encryption.key" % dirname + config.router.transportKeyfile = "%s/transport.key" % dirname + config.router.identKeyfile = "%s/identity.key" % dirname + config.router.ourRcFile = "%s/rc.signed" % dirname + config.router.netid = self.netid + config.router.blockBogons = False - config.network.enableProfiling = False - config.network.routerProfilesFile = "%s/profiles.dat" % dirname - config.network.netConfig = {"type": "null"} + config.network.enableProfiling = False + config.network.routerProfilesFile = "%s/profiles.dat" % dirname + config.network.netConfig = {"type": "null"} - config.netdb.nodedbDir = "%s/netdb" % dirname + config.netdb.nodedbDir = "%s/netdb" % dirname - config.system.pidfile = "%s/lokinet.pid" % dirname + config.system.pidfile = "%s/lokinet.pid" % dirname - config.dns.netConfig = {"local-dns": ("127.3.2.1:%d" % port)} + config.dns.netConfig = {"local-dns": ("127.3.2.1:%d" % port)} - config.bootstrap.routers = ["%s/relays/1/rc.signed" % tmpdir] + config.bootstrap.routers = ["%s/relays/1/rc.signed" % self.tmpdir] - hive.AddClient(config) + self.hive.AddClient(config) + + def onGotEndpoint(self, ep): + addr = ep.OurAddress() + self.addrs.append(pyllarp.ServiceAddress(addr)) + + def sendToAddress(self, router, toaddr, pkt): + if router.IsRelay(): + return + if router.TrySendPacket("default", toaddr, pkt): + print("sending {} bytes to {}".format(len(pkt), toaddr)) + + def broadcastTo(self, addr, pkt): + self.hive.ForEachRouter(lambda r : sendToAddress(r, addr, pkt)) + + def InitFirstRC(self): + print("Starting first router to init its RC for bootstrap") + self.hive = pyllarp.RouterHive() + self.AddRelay(1) + self.hive.StartRelays() + print("sleeping 2 sec to give plenty of time to save bootstrap rc") + sleep(2) + + self.hive.StopAll() + + def Start(self): + + self.InitFirstRC() + + print("Resetting hive. Creating %d relays and %d clients" % (self.n_relays, self.n_clients)) + + self.hive = pyllarp.RouterHive() + + for i in range(1, self.n_relays + 1): + self.AddRelay(i) + + for i in range(1, self.n_clients + 1): + self.AddClient(i) + + print("Starting relays") + self.hive.StartRelays() + + sleep(1) + self.hive.ForEachRelay(lambda r: self.MakeEndpoint(r, self.onGotEndpoint)) + + print("Sleeping 5 seconds before starting clients") + sleep(5) + + self.hive.StartClients() + + sleep(1) + self.hive.ForEachClient(lambda r: self.MakeEndpoint(r, self.onGotEndpoint)) + + def Stop(self): + self.hive.StopAll() + + def CollectNextEvent(self): + self.events.append(self.hive.GetNextEvent()) + + def CollectAllEvents(self): + self.events.extend(self.hive.GetAllEvents()) + + def PopEvent(self): + self.CollectAllEvents() + if len(self.events): + return self.events.popleft() + return None def main(n_relays=10, n_clients=10, print_each_event=True): - pyllarp.EnableDebug() + running = True - if not RemoveTmpDir(tmpdir): - return def handle_sigint(sig, frame): nonlocal running @@ -117,60 +199,18 @@ def main(n_relays=10, n_clients=10, print_each_event=True): signal(SIGINT, handle_sigint) - hive = pyllarp.RouterHive() - AddRelay(hive, 1) - hive.StartRelays() - print("sleeping 2 sec to give plenty of time to save bootstrap rc") - for i in range(2): - print(i+1) - sleep(1) - - print("Resetting hive. Creating %d relays and %d clients" % (n_relays, n_clients)) - hive.StopAll() - - hive = pyllarp.RouterHive() - - addrs = [] - - def onGotEndpoint(ep): - addr = ep.OurAddress() - addrs.append(pyllarp.ServiceAddress(addr)) - - def sendToAddress(router, toaddr, pkt): - if router.IsRelay(): - return - if router.TrySendPacket("default", toaddr, pkt): - print("sending {} bytes to {}".format(len(pkt), toaddr)) - - def broadcastTo(addr, pkt): - hive.ForEachRouter(lambda r : sendToAddress(r, addr, pkt)) - - for i in range(1, n_relays + 1): - AddRelay(hive, i) - - for i in range(1, n_clients + 1): - AddClient(hive, i) - - print("Starting relays") - hive.StartRelays() - - sleep(1) - hive.ForEachRelay(lambda r: MakeEndpoint(r, onGotEndpoint)) - - print("Sleeping 5 seconds before starting clients") - sleep(5) - - hive.StartClients() - - sleep(1) - hive.ForEachClient(lambda r: MakeEndpoint(r, onGotEndpoint)) - + try: + hive = RouterHive(n_relays, n_clients) + hive.Start() + except Exception as err: + print(err) + return 1 total_events = 0 event_counts = dict() while running: - event = hive.GetNextEvent() + event = hive.PopEvent() event_name = event.__class__.__name__ if event: if print_each_event: @@ -190,8 +230,10 @@ def main(n_relays=10, n_clients=10, print_each_event=True): if total_events % 10 == 0: pprint(event_counts) + sleep(.01) + print('stopping') - hive.StopAll() + hive.Stop() print('stopped') del hive diff --git a/llarp/tooling/router_hive.cpp b/llarp/tooling/router_hive.cpp index 9bb3dd370..18f406f44 100644 --- a/llarp/tooling/router_hive.cpp +++ b/llarp/tooling/router_hive.cpp @@ -105,7 +105,7 @@ namespace tooling { std::lock_guard guard{eventQueueMutex}; - eventQueue.push(std::move(event)); + eventQueue.push_back(std::move(event)); } RouterEventPtr @@ -116,12 +116,25 @@ namespace tooling if (not eventQueue.empty()) { auto ptr = std::move(eventQueue.front()); - eventQueue.pop(); + eventQueue.pop_front(); return ptr; } return nullptr; } + std::deque + RouterHive::GetAllEvents() + { + std::lock_guard guard{eventQueueMutex}; + + std::deque events; + if (not eventQueue.empty()) + { + eventQueue.swap(events); + } + return events; + } + void RouterHive::VisitRouter(llarp_main *router, std::function visit) { diff --git a/llarp/tooling/router_hive.hpp b/llarp/tooling/router_hive.hpp index 6d179e3cb..25ae41d17 100644 --- a/llarp/tooling/router_hive.hpp +++ b/llarp/tooling/router_hive.hpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include @@ -69,6 +69,9 @@ namespace tooling RouterEventPtr GetNextEvent(); + std::deque + GetAllEvents(); + void ForEachRelay(std::function visit) { @@ -95,16 +98,13 @@ namespace tooling ForEachClient(visit); } - - - std::vector relays; std::vector clients; std::vector routerMainThreads; std::mutex eventQueueMutex; - std::queue eventQueue; + std::deque eventQueue; }; } // namespace tooling diff --git a/pybind/llarp/tooling/router_event.cpp b/pybind/llarp/tooling/router_event.cpp index bd71a5ba6..08230112c 100644 --- a/pybind/llarp/tooling/router_event.cpp +++ b/pybind/llarp/tooling/router_event.cpp @@ -1,4 +1,5 @@ #include "common.hpp" +#include "pybind11/stl.h" #include "tooling/router_event.hpp" #include "tooling/dht_event.hpp" diff --git a/pybind/llarp/tooling/router_hive.cpp b/pybind/llarp/tooling/router_hive.cpp index d9726416c..db36ee4f1 100644 --- a/pybind/llarp/tooling/router_hive.cpp +++ b/pybind/llarp/tooling/router_hive.cpp @@ -1,7 +1,9 @@ #include "common.hpp" +#include "pybind11/stl.h" +#include "pybind11/iostream.h" + #include #include "llarp.hpp" -#include "pybind11/iostream.h" namespace tooling { void @@ -18,7 +20,8 @@ namespace tooling .def("ForEachRelay", &RouterHive::ForEachRelay) .def("ForEachClient", &RouterHive::ForEachClient) .def("ForEachRouter", &RouterHive::ForEachRouter) - .def("GetNextEvent", &RouterHive::GetNextEvent); + .def("GetNextEvent", &RouterHive::GetNextEvent) + .def("GetAllEvents", &RouterHive::GetAllEvents); } }