Merge pull request #1272 from majestrate/exit-nodes-2020-05-16

exit traffic via snapps
This commit is contained in:
Jeff 2020-06-01 14:42:43 -04:00 committed by GitHub
commit 45cda241f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 977 additions and 689 deletions

View File

@ -33,7 +33,7 @@ option(STATIC_LINK "link statically against dependencies" OFF)
option(BUILD_SHARED_LIBS "build lokinet libraries as shared libraries instead of static" ON)
option(SHADOW "use shadow testing framework. linux only" OFF)
option(XSAN "use sanitiser, if your system has it (requires -DCMAKE_BUILD_TYPE=Debug)" OFF)
option(JEMALLOC "use jemalloc. Not required on BSD" OFF)
option(WITH_JEMALLOC "use jemalloc as allocator" OFF)
option(TESTNET "testnet build" OFF)
option(WITH_COVERAGE "generate coverage data" OFF)
option(USE_SHELLHOOKS "enable shell hooks on compile time (dangerous)" OFF)
@ -49,6 +49,12 @@ endif()
include(CheckCXXSourceCompiles)
include(CheckLibraryExists)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_C_STANDARD 99)
set(CMAKE_C_STANDARD_REQUIRED ON)
set(CMAKE_C_EXTENSIONS OFF)
include(cmake/enable_lto.cmake)
include(cmake/target_link_libraries_system.cmake)
@ -80,14 +86,6 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
include(MacroEnsureOutOfSourceBuild)
macro_ensure_out_of_source_build("${PROJECT_NAME} requires an out-of-source build. Create a build directory and run 'cmake ${CMAKE_SOURCE_DIR} [options]'.")
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_C_STANDARD 99)
set(CMAKE_C_STANDARD_REQUIRED ON)
set(CMAKE_C_EXTENSIONS OFF)
# Always build PIC
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
@ -117,9 +115,9 @@ if (NOT CMAKE_SYSTEM_NAME MATCHES "Linux" AND SHADOW)
endif()
if(XSAN)
string(APPEND CMAKE_CXX_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer")
string(APPEND CMAKE_CXX_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer -fno-sanitize-recover")
foreach(type EXE MODULE SHARED STATIC)
string(APPEND CMAKE_${type}_LINKER_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer")
string(APPEND CMAKE_${type}_LINKER_FLAGS_DEBUG " -fsanitize=${XSAN} -fno-omit-frame-pointer -fno-sanitize-recover")
endforeach()
message(STATUS "Doing a ${XSAN} sanitizer build")
endif()
@ -213,15 +211,6 @@ endif()
string(REGEX REPLACE "^fatal.*$" nogit GIT_VERSION_REAL "${GIT_VERSION}")
# HeapAlloc(2) on Windows was significantly revamped in 2009
# but the old algorithm isn't too bad either
# this is _the_ system allocator on BSD UNIX
# openbsd replaced it with a secure/randomised malloc not too
# long ago
if(JEMALLOC)
set(MALLOC_LIB jemalloc)
endif()
find_package(PkgConfig QUIET)
if(PKG_CONFIG_FOUND)

57
cmake/FindJemalloc.cmake Normal file
View File

@ -0,0 +1,57 @@
#
# Find the JEMALLOC client includes and library
#
# This module defines
# JEMALLOC_INCLUDE_DIR, where to find jemalloc.h
# JEMALLOC_LIBRARIES, the libraries to link against
# JEMALLOC_FOUND, if false, you cannot build anything that requires JEMALLOC
# also defined, but not for general use are
# JEMALLOC_LIBRARY, where to find the JEMALLOC library.
set( JEMALLOC_FOUND 0 )
if ( UNIX )
FIND_PATH( JEMALLOC_INCLUDE_DIR
NAMES
jemalloc/jemalloc.h
PATHS
/usr/include
/usr/include/jemalloc
/usr/local/include
/usr/local/include/jemalloc
$ENV{JEMALLOC_ROOT}
$ENV{JEMALLOC_ROOT}/include
DOC
"Specify include-directories that might contain jemalloc.h here."
)
FIND_LIBRARY( JEMALLOC_LIBRARY
NAMES
jemalloc libjemalloc JEMALLOC
PATHS
/usr/lib
/usr/lib/jemalloc
/usr/local/lib
/usr/local/lib/jemalloc
/usr/local/jemalloc/lib
$ENV{JEMALLOC_ROOT}/lib
$ENV{JEMALLOC_ROOT}
DOC "Specify library-locations that might contain the jemalloc library here."
)
if ( JEMALLOC_LIBRARY )
if ( JEMALLOC_INCLUDE_DIR )
set( JEMALLOC_FOUND 1 )
message( STATUS "Found JEMALLOC library: ${JEMALLOC_LIBRARY}")
message( STATUS "Found JEMALLOC headers: ${JEMALLOC_INCLUDE_DIR}")
else ( JEMALLOC_INCLUDE_DIR )
message(FATAL_ERROR "Could not find jemalloc headers! Please install jemalloc libraries and headers")
endif ( JEMALLOC_INCLUDE_DIR )
endif ( JEMALLOC_LIBRARY )
add_library(jemalloc SHARED IMPORTED)
set_target_properties(jemalloc PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${JEMALLOC_INCLUDE_DUR}"
IMPORTED_LOCATION "${JEMALLOC_LIBRARY}")
mark_as_advanced( JEMALLOC_FOUND JEMALLOC_LIBRARY JEMALLOC_EXTRA_LIBRARIES JEMALLOC_INCLUDE_DIR )
endif (UNIX)

View File

@ -5,6 +5,15 @@ endif()
include(CheckCXXSourceCompiles)
include(CheckLibraryExists)
if(WITH_JEMALLOC)
find_package(Jemalloc REQUIRED)
if(NOT JEMALLOC_FOUND)
message(FATAL_ERROR "did not find jemalloc")
endif()
add_definitions(-DUSE_JEMALLOC)
message(STATUS "using jemalloc")
endif()
add_library(curl INTERFACE)
option(DOWNLOAD_CURL "download and statically compile in CURL" OFF)

View File

@ -15,9 +15,11 @@ class Monitor:
def __init__(self, url):
self.data = dict()
self.win = curses.initscr()
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
self._url = url
while len(self._globalspeed) < self._speedSamples:
self._globalspeed.append((0, 0))
self._globalspeed.append((0, 0, 0, 0))
def __del__(self):
curses.endwin()
@ -82,6 +84,18 @@ class Monitor:
idx += 1
return "{} {}ps".format("%.2f" % rate, units[idx])
def get_all_paths(self):
for k in self.data['services']:
status = self.data['services'][k]
for path in (status['paths'] or []):
yield path
for s in (status['remoteSessions'] or []):
for path in s['paths']:
yield path
for s in (status['snodeSessions'] or []):
for path in s['paths']:
yield path
def display_service(self, y, name, status):
"""display a service at current position"""
self.win.move(y, 1)
@ -128,27 +142,34 @@ class Monitor:
y += 2
self.win.move(y, 1)
self.win.addstr(
"global speed:\t\t[{}\ttx]\t[{}\trx]".format(
"throughput:\t\t[{}\ttx]\t[{}\trx]".format(
self.speedOf(self.txrate), self.speedOf(self.rxrate)
)
)
bloat_tx, bloat_rx = self.calculate_bloat(self.data['links']['outbound'], self.get_all_paths())
y += 1
self.win.move(y, 1)
self.win.addstr("goodput:\t\t[{}\ttx]\t[{}\trx]".format(self.speedOf(self.txrate-bloat_tx), self.speedOf(self.rxrate-bloat_rx)))
y += 1
self.win.move(y, 1)
self.win.addstr("overhead:\t\t[{}\ttx]\t[{}\trx]".format(self.speedOf(bloat_tx), self.speedOf(bloat_rx)))
self._globalspeed.append((self.txrate, self.rxrate))
self._globalspeed.append((self.txrate, self.rxrate, bloat_tx, bloat_rx))
while len(self._globalspeed) > self._speedSamples:
self._globalspeed.pop(0)
return self.display_speedgraph(y + 2, self._globalspeed)
def display_speedgraph(self, y, samps, maxsz=20):
def display_speedgraph(self, y, samps, maxsz=40):
""" display global speed graph """
def scale(x, n):
while n > 0:
x /= 2
n -= 1
return int(x)
txmax, rxmax = 1000, 1000
for tx, rx in samps:
txmax, rxmax = 1024, 1024
for tx, rx, _tx, _rx in samps:
if tx > txmax:
txmax = tx
if rx > rxmax:
@ -164,13 +185,13 @@ class Monitor:
txscale += 1
txmax /= 2
def makebar(samp, max):
bar = "#" * samp
def makebar(samp, badsamp, max):
bar = "#" * (samp - badsamp)
pad = " " * (max - samp)
return pad, bar
return pad, bar, '#' * badsamp
txlabelpad = int(txmax / 2) - 1
rxlabelpad = int(rxmax / 2) - 1
txlabelpad = int(txmax / 2)# - 1
rxlabelpad = int(rxmax / 2)# - 1
if txlabelpad <= 0:
txlabelpad = 1
if rxlabelpad <= 0:
@ -182,32 +203,77 @@ class Monitor:
self.win.addstr(
"{}tx{}{}rx{}".format(txlabelpad, txlabelpad, rxlabelpad, rxlabelpad)
)
for tx, rx in samps:
for tx, rx, btx, brx in samps:
y += 1
self.win.move(y, 1)
txpad, txbar = makebar(scale(tx, txscale), int(txmax))
rxpad, rxbar = makebar(scale(rx, rxscale), int(rxmax))
self.win.addstr("{}{}|{}{}".format(txpad, txbar, rxbar, rxpad))
txpad, txbar, btxbar = makebar(scale(tx,txscale),scale(btx,txscale), int(txmax))
rxpad, rxbar, brxbar = makebar(scale(rx,rxscale),scale(brx,rxscale), int(rxmax))
self.win.addstr(txpad)
self.win.addstr(btxbar, curses.color_pair(1))
self.win.addstr(txbar)
self.win.addstr('|')
self.win.addstr(rxbar)
self.win.addstr(brxbar, curses.color_pair(1))
self.win.addstr(rxpad)
return y + 2
def calculate_bloat(self, links, paths):
"""
calculate bandwith overhead
"""
lltx = 0
llrx = 0
tx = 0
rx = 0
for link in links:
sessions = link["sessions"]["established"]
for s in sessions:
lltx += s['tx']
llrx += s['rx']
for path in paths:
tx += path['txRateCurrent']
rx += path['rxRateCurrent']
if lltx > tx:
lltx -= tx
if llrx > rx:
llrx -= rx
return lltx, llrx
def display_link(self, y, link):
y += 1
self.win.move(y, 1)
sessions = link["sessions"]["established"]
for s in sessions:
y += 1
self.win.move(y, 1)
self.txrate += s["txRateCurrent"]
self.rxrate += s["rxRateCurrent"]
self.win.addstr(
"{}\t[{}\ttx]\t[{}\trx]".format(
s["remoteAddr"], self.speedOf(s["txRateCurrent"]), self.speedOf(s["rxRateCurrent"])
)
y = self.display_link_session(y, s)
return y
def display_link_session(self, y, s):
y += 1
self.win.move(y, 1)
self.txrate += s["txRateCurrent"]
self.rxrate += s["rxRateCurrent"]
self.win.addstr(
"{}\t[{}\ttx]\t[{}\trx]".format(
s["remoteAddr"], self.speedOf(s["txRateCurrent"]), self.speedOf(s["rxRateCurrent"])
)
if (s['txMsgQueueSize'] or 0) > 1:
self.win.addstr(" [out window:\t{}]".format(s['txMsgQueueSize']))
)
if (s['txMsgQueueSize'] or 0) > 1:
self.win.addstr(" [out window: {}]".format(s['txMsgQueueSize']))
if (s['rxMsgQueueSize'] or 0) > 1:
self.win.addstr(" [in window:\t{}]".format(s['rxMsgQueueSize']))
self.win.addstr(" [in window: {}]".format(s['rxMsgQueueSize']))
def display(acks, label, num='acks', dem='packets'):
if acks[dem] > 0:
self.win.addstr(" [{}: {}]".format(label, round(float(acks[num]) / float(acks[dem]), 2)))
if ('recvMACKs' in s) and ('sendMACKs' in s):
display(s['sendMACKs'], 'out MACK density')
display(s['recvMACKs'], 'in MACK density')
d = {'recvAcks': 'in acks', 'sendAcks': 'out acks', 'recvRTX': 'in RTX', 'sendRTX': 'out RTX'}
for k in d:
v = d[k]
if (k in s) and (s[k] > 0):
self.win.addstr(" [{}: {}]".format(v, s[k]))
return y
def display_dht(self, y, data):

View File

@ -20,6 +20,9 @@ else()
target_link_directories(${exe} PRIVATE /usr/local/lib)
endif()
target_link_libraries(${exe} PRIVATE liblokinet)
if(WITH_JEMALLOC)
target_link_libraries(${exe} PUBLIC jemalloc)
endif()
target_compile_definitions(${exe} PRIVATE -DVERSIONTAG=${GIT_VERSION_REAL})
add_log_tag(${exe})
endforeach()
@ -40,4 +43,7 @@ else()
target_link_libraries(lokinetctl PRIVATE ${CURL_LIBRARIES})
endif(CURL_FOUND)
install(PROGRAMS lokinet-vpn DESTINATION bin COMPONENT lokinet)
endif()

98
daemon/lokinet-vpn Executable file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env python3
import argparse
import sys
from socket import AF_INET
import requests
from pyroute2 import IPRoute
class LokinetRPC:
def __init__(self, url):
self._url = url
def _jsonrpc(self, method, params={}):
r = requests.post(
self._url,
headers={"Content-Type": "application/json", "Host": "localhost"},
json={
"jsonrpc": "2.0",
"id": "0",
"method": "{}".format(method),
"params": params,
},
)
return r.json()
def get_first_hops(self):
data = self._jsonrpc("llarp.admin.dumpstate")
for link in data['result']['links']['outbound']:
for session in link["sessions"]['established']:
yield session['remoteAddr']
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--rpc", type=str, default='127.0.0.1:1190')
ap.add_argument("--ifname", type=str, default="lokitun0")
ap.add_argument("--up", action='store_const', dest='action', const='up')
ap.add_argument("--down", action='store_const', dest='action', const='down')
args = ap.parse_args()
rpc = LokinetRPC('http://{}/jsonrpc'.format(args.rpc))
hops = dict()
for hop in rpc.get_first_hops():
ip = hop.split(':')[0]
hops[ip] = 0
if len(hops) == 0:
print("lokinet is not connected yet")
return 1
with IPRoute() as ip:
ip.bind()
try:
idx = ip.link_lookup(ifname=args.ifname)[0]
except:
print("cannot find {}".format(args.ifname))
return 1
gateways = ip.get_default_routes(family=AF_INET)
gateway = None
for g in gateways:
useThisGateway = True
for name, val in g['attrs']:
if name == 'RTA_OIF' and val == idx:
useThisGateway = False
if not useThisGateway:
continue
for name, val in g['attrs']:
if name == 'RTA_GATEWAY':
gateway = val
if gateway:
for address in hops:
try:
if args.action == 'up':
ip.route("add", dst="{}/32".format(address), gateway=gateway)
elif args.action == 'down':
ip.route("del", dst="{}/32".format(address), gateway=gateway)
except:
pass
if args.action == 'up':
try:
ip.route('add', dst='0.0.0.0/0', oif=idx)
except:
print('failed to add default route')
return 1
elif args.action == 'down':
try:
ip.route('del', dst='0.0.0.0/0', oif=idx)
except:
print('failed to remove default route')
return 1
else:
print("could not find gateway")
return 1
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -14,6 +14,32 @@
#include <iostream>
#include <future>
#ifdef USE_JEMALLOC
#include <new>
#include <jemalloc/jemalloc.h>
void*
operator new(std::size_t sz)
{
void* ptr = malloc(sz);
if (ptr)
return ptr;
else
throw std::bad_alloc{};
}
void
operator delete(void* ptr) noexcept
{
free(ptr);
}
void
operator delete(void* ptr, size_t) noexcept
{
free(ptr);
}
#endif
#ifdef _WIN32
#define wmin(x, y) (((x) < (y)) ? (x) : (y))
#define MIN wmin
@ -196,8 +222,7 @@ main(int argc, char* argv[])
if (genconfigOnly)
{
llarp::ensureConfig(
llarp::GetDefaultDataDir(), llarp::GetDefaultConfigPath(), overwrite, opts.isRelay);
llarp::ensureConfig(basedir, fname, overwrite, opts.isRelay);
}
else
{

13
debian/lokinet.lokinet-vpn.service vendored Normal file
View File

@ -0,0 +1,13 @@
[Unit]
Description=LokiNET VPN tunnel: shove all traffic over lokinet
Wants=lokinet.service
After=lokinet.service
[Service]
Type=oneshot
ExecStart=/usr/bin/lokinet-vpn --up
ExecStop=/usr/bin/lokinet-vpn --down
RemainAfterExit=true
[Install]
WantedBy=multi-user.target

View File

@ -5,6 +5,7 @@
#include <constants/defaults.hpp>
#include <constants/files.hpp>
#include <net/net.hpp>
#include <net/ip.hpp>
#include <router_contact.hpp>
#include <stdexcept>
#include <util/fs.hpp>
@ -142,6 +143,8 @@ namespace llarp
conf.defineOption<std::string>(
"network", "type", false, "tun", AssignmentAcceptor(m_endpointType));
conf.defineOption<bool>("network", "exit", false, false, AssignmentAcceptor(m_AllowExit));
conf.defineOption<bool>(
"network",
"profiling",
@ -165,27 +168,59 @@ namespace llarp
conf.defineOption<bool>(
"network", "reachable", false, ReachableDefault, AssignmentAcceptor(m_reachable));
conf.defineOption<int>("network", "hops", false, HopsDefault, [](int arg) {
conf.defineOption<int>("network", "hops", false, HopsDefault, [this](int arg) {
if (arg < 1 or arg > 8)
throw std::invalid_argument("[endpoint]:hops must be >= 1 and <= 8");
m_hops = arg;
});
conf.defineOption<int>("network", "paths", false, PathsDefault, [](int arg) {
conf.defineOption<int>("network", "paths", false, PathsDefault, [this](int arg) {
if (arg < 1 or arg > 8)
throw std::invalid_argument("[endpoint]:paths must be >= 1 and <= 8");
m_paths = arg;
});
#ifdef LOKINET_EXITS
conf.defineOption<std::string>("network", "exit-node", false, "", [this](std::string arg) {
// TODO: validate as valid .loki / .snode address
// probably not .snode...?
m_exitNode = arg;
if (arg.empty())
return;
service::Address exit;
if (not exit.FromString(arg))
{
throw std::invalid_argument(stringify("[endpoint]:exit-node bad address: ", arg));
}
m_exitNode = exit;
});
#endif
conf.defineOption<std::string>("network", "mapaddr", false, "", [this](std::string arg) {
// TODO: parse / validate as loki_addr : IP addr pair
m_mapAddr = arg;
if (arg.empty())
return;
huint128_t ip;
service::Address addr;
const auto pos = arg.find(":");
if (pos == std::string::npos)
{
throw std::invalid_argument(stringify("[endpoint]:mapaddr invalid entry: ", arg));
}
std::string addrstr = arg.substr(0, pos);
std::string ipstr = arg.substr(pos + 1);
if (not ip.FromString(ipstr))
{
huint32_t ipv4;
if (not ipv4.FromString(ipstr))
{
throw std::invalid_argument(stringify("[endpoint]:mapaddr invalid ip: ", ipstr));
}
ip = net::ExpandV4(ipv4);
}
if (not addr.FromString(addrstr))
{
throw std::invalid_argument(stringify("[endpoint]:mapaddr invalid addresss: ", addrstr));
}
if (m_mapAddrs.find(ip) != m_mapAddrs.end())
{
throw std::invalid_argument(stringify("[endpoint]:mapaddr ip already mapped: ", ipstr));
}
m_mapAddrs[ip] = addr;
});
conf.defineOption<std::string>("network", "ifaddr", false, "", [this](std::string arg) {
@ -237,7 +272,7 @@ namespace llarp
m_upstreamDNS.push_back(parseDNSAddr(std::move(arg)));
});
conf.defineOption<std::string>("dns", "bind", false, std::nullopt, [=](std::string arg) {
conf.defineOption<std::string>("dns", "bind", false, "127.3.2.1:53", [=](std::string arg) {
m_bind = parseDNSAddr(std::move(arg));
});
}
@ -796,14 +831,12 @@ namespace llarp
"Adds a `.snode` address to the blacklist.",
});
#ifdef LOKINET_EXITS
def.addOptionComments(
"network",
"exit-node",
{
"Specify a `.snode` or `.loki` address to use as an exit broker.",
"Specify a `.loki` address to use as an exit broker.",
});
#endif
def.addOptionComments(
"network",

View File

@ -10,6 +10,8 @@
#include <config/definition.hpp>
#include <constants/files.hpp>
#include <net/ip_address.hpp>
#include <net/net_int.hpp>
#include <service/address.hpp>
#include <cstdlib>
#include <functional>
@ -75,11 +77,10 @@ namespace llarp
bool m_reachable = false;
int m_hops = -1;
int m_paths = -1;
bool m_AllowExit = false;
std::set<RouterID> m_snodeBlacklist;
#ifdef LOKINET_EXITS
std::string m_exitNode;
#endif
std::string m_mapAddr;
std::optional<service::Address> m_exitNode;
std::unordered_map<huint128_t, service::Address> m_mapAddrs;
// TODO:
// on-up

View File

@ -0,0 +1,7 @@
#pragma once
namespace llarp
{
/// default queue length for logic jobs
constexpr std::size_t event_loop_queue_size = 1024;
} // namespace llarp

View File

@ -31,9 +31,13 @@ namespace llarp
constexpr auto build_timeout = 30s;
/// measure latency every this interval ms
constexpr auto latency_interval = 5s;
constexpr auto latency_interval = 20s;
/// if a path is inactive for this amount of time it's dead
constexpr auto alive_timeout = 30s;
constexpr auto alive_timeout = latency_interval * 1.5;
/// how big transit hop traffic queues are
constexpr std::size_t transit_hop_queue_size = 256;
} // namespace path
} // namespace llarp

View File

@ -45,10 +45,7 @@ namespace llarp
if (threads <= 0)
threads = 1;
worker = std::make_shared<llarp::thread::ThreadPool>(threads, 1024, "llarp-worker");
auto jobQueueSize = config->router.m_JobQueueSize;
if (jobQueueSize < 1024)
jobQueueSize = 1024;
logic = std::make_shared<Logic>(jobQueueSize);
logic = std::make_shared<Logic>();
nodedb_dir = fs::path(config->router.m_dataDir / nodedb_dirname).string();
@ -80,7 +77,10 @@ namespace llarp
llarp::LogInfo(llarp::VERSION_FULL, " ", llarp::RELEASE_MOTTO);
llarp::LogInfo("starting up");
if (mainloop == nullptr)
mainloop = llarp_make_ev_loop();
{
auto jobQueueSize = std::max(event_loop_queue_size, config->router.m_JobQueueSize);
mainloop = llarp_make_ev_loop(jobQueueSize);
}
logic->set_event_loop(mainloop.get());
mainloop->set_logic(logic);

View File

@ -53,7 +53,7 @@ namespace llarp
operator()(const TXOwner& o) const noexcept
{
std::size_t sz2;
memcpy(&sz2, &o.node[0], sizeof(std::size_t));
memcpy(&sz2, o.node.data(), sizeof(std::size_t));
return o.txid ^ (sz2 << 1);
}
};

View File

@ -15,9 +15,9 @@
#endif
llarp_ev_loop_ptr
llarp_make_ev_loop()
llarp_make_ev_loop(size_t queueLength)
{
llarp_ev_loop_ptr r = std::make_shared<libuv::Loop>();
llarp_ev_loop_ptr r = std::make_shared<libuv::Loop>(queueLength);
r->init();
r->update_time();
return r;
@ -26,12 +26,7 @@ llarp_make_ev_loop()
void
llarp_ev_loop_run_single_process(llarp_ev_loop_ptr ev, std::shared_ptr<llarp::Logic> logic)
{
while (ev->running())
{
ev->update_time();
ev->tick(EV_TICK_INTERVAL);
llarp::LogContext::Instance().logStream->Tick(ev->time_now());
}
ev->run();
logic->clear_event_loop();
ev->stopped();
}

View File

@ -25,6 +25,8 @@
#include <uv.h>
#endif
#include <constants/evloop.hpp>
/**
* ev.h
*
@ -47,8 +49,9 @@ using llarp_ev_loop_ptr = std::shared_ptr<llarp_ev_loop>;
/// make an event loop using our baked in event loop on Windows
/// make an event loop using libuv otherwise.
/// @param queue_size how big the logic job queue is
llarp_ev_loop_ptr
llarp_make_ev_loop();
llarp_make_ev_loop(std::size_t queue_size = llarp::event_loop_queue_size);
// run mainloop
void
@ -62,10 +65,6 @@ llarp_ev_loop_time_now_ms(const llarp_ev_loop_ptr& ev);
void
llarp_ev_loop_stop(const llarp_ev_loop_ptr& ev);
/// list of packets we recv'd
/// forward declared
struct llarp_pkt_list;
/// UDP handling configuration
struct llarp_udp_io
{
@ -83,11 +82,6 @@ struct llarp_udp_io
int (*sendto)(struct llarp_udp_io*, const llarp::SockAddr&, const byte_t*, size_t);
};
/// get all packets recvieved last tick
/// return true if we got packets return false if we didn't
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* udp, struct llarp_pkt_list* pkts);
/// add UDP handler
int
llarp_ev_add_udp(struct llarp_ev_loop* ev, struct llarp_udp_io* udp, const llarp::SockAddr& src);

View File

@ -812,72 +812,4 @@ struct llarp_ev_loop
call_soon(std::function<void(void)> f) = 0;
};
struct PacketBuffer
{
PacketBuffer(PacketBuffer&& other)
{
_ptr = other._ptr;
_sz = other._sz;
other._ptr = nullptr;
other._sz = 0;
}
PacketBuffer(const PacketBuffer&) = delete;
PacketBuffer&
operator=(const PacketBuffer&) = delete;
PacketBuffer() : PacketBuffer(nullptr, 0){};
explicit PacketBuffer(size_t sz) : _sz{sz}
{
_ptr = new char[sz];
}
PacketBuffer(char* buf, size_t sz)
{
_ptr = buf;
_sz = sz;
}
~PacketBuffer()
{
if (_ptr)
delete[] _ptr;
}
byte_t*
data()
{
return (byte_t*)_ptr;
}
size_t
size()
{
return _sz;
}
byte_t& operator[](size_t sz)
{
return data()[sz];
}
void
reserve(size_t sz)
{
if (_ptr)
delete[] _ptr;
_ptr = new char[sz];
_sz = sz;
}
private:
char* _ptr = nullptr;
size_t _sz = 0;
};
struct PacketEvent
{
llarp::SockAddr remote;
PacketBuffer pkt;
};
struct llarp_pkt_list : public std::vector<PacketEvent>
{
};
#endif

View File

@ -397,8 +397,7 @@ namespace libuv
uv_check_t m_Ticker;
llarp_udp_io* const m_UDP;
llarp::SockAddr m_Addr;
llarp_pkt_list m_LastPackets;
std::array<char, 1500> m_Buffer;
std::vector<char> m_Buffer;
udp_glue(uv_loop_t* loop, llarp_udp_io* udp, const llarp::SockAddr& src)
: m_UDP(udp), m_Addr(src)
@ -410,11 +409,13 @@ namespace libuv
}
static void
Alloc(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
Alloc(uv_handle_t* h, size_t suggested_size, uv_buf_t* buf)
{
const size_t sz = std::min(suggested_size, size_t{1500});
buf->base = new char[sz];
buf->len = sz;
udp_glue* self = static_cast<udp_glue*>(h->data);
if (self->m_Buffer.empty())
self->m_Buffer.resize(suggested_size);
buf->base = self->m_Buffer.data();
buf->len = self->m_Buffer.size();
}
/// callback for libuv
@ -424,16 +425,6 @@ namespace libuv
udp_glue* glue = static_cast<udp_glue*>(handle->data);
if (addr)
glue->RecvFrom(nread, buf, llarp::SockAddr(*addr));
if (nread <= 0 || glue->m_UDP == nullptr || glue->m_UDP->recvfrom != nullptr)
delete[] buf->base;
}
bool
RecvMany(llarp_pkt_list* pkts)
{
*pkts = std::move(m_LastPackets);
m_LastPackets = llarp_pkt_list();
return pkts->size() > 0;
}
void
@ -447,11 +438,6 @@ namespace libuv
const llarp_buffer_t pkt((const byte_t*)buf->base, pktsz);
m_UDP->recvfrom(m_UDP, fromaddr, ManagedBuffer{pkt});
}
else
{
PacketBuffer pbuf(buf->base, pktsz);
m_LastPackets.emplace_back(PacketEvent{fromaddr, std::move(pbuf)});
}
}
}
@ -760,12 +746,15 @@ namespace libuv
OnAsyncWake(uv_async_t* async_handle)
{
Loop* loop = static_cast<Loop*>(async_handle->data);
loop->update_time();
loop->process_timer_queue();
loop->process_cancel_queue();
loop->FlushLogic();
llarp::LogContext::Instance().logStream->Tick(loop->time_now());
}
Loop::Loop() : llarp_ev_loop(), m_LogicCalls(1024), m_timerQueue(20), m_timerCancelQueue(20)
Loop::Loop(size_t queue_size)
: llarp_ev_loop(), m_LogicCalls(queue_size), m_timerQueue(20), m_timerCancelQueue(20)
{
}
@ -787,12 +776,13 @@ namespace libuv
#endif
m_TickTimer = new uv_timer_t;
m_TickTimer->data = this;
if (uv_timer_init(&m_Impl, m_TickTimer) == -1)
return false;
m_Run.store(true);
m_nextID.store(0);
m_WakeUp.data = this;
uv_async_init(&m_Impl, &m_WakeUp, &OnAsyncWake);
return uv_timer_init(&m_Impl, m_TickTimer) != -1;
return true;
}
void
@ -829,6 +819,11 @@ namespace libuv
int
Loop::run()
{
uv_timer_start(
m_TickTimer,
[](uv_timer_t* t) { static_cast<Loop*>(t->loop->data)->FlushLogic(); },
1000,
1000);
return uv_run(&m_Impl, UV_RUN_DEFAULT);
}
@ -923,11 +918,7 @@ namespace libuv
while (not m_timerCancelQueue.empty())
{
uint64_t job_id = m_timerCancelQueue.popFront();
auto itr = m_pendingCalls.find(job_id);
if (itr != m_pendingCalls.end())
{
m_pendingCalls.erase(itr);
}
m_pendingCalls.erase(job_id);
}
}
@ -937,8 +928,9 @@ namespace libuv
auto itr = m_pendingCalls.find(job_id);
if (itr != m_pendingCalls.end())
{
LogicCall(m_Logic, itr->second);
m_pendingCalls.erase(itr);
if (itr->second)
itr->second();
m_pendingCalls.erase(itr->first);
}
}
@ -1063,9 +1055,3 @@ namespace libuv
}
} // namespace libuv
bool
llarp_ev_udp_recvmany(struct llarp_udp_io* u, struct llarp_pkt_list* pkts)
{
return static_cast<libuv::udp_glue*>(u->impl)->RecvMany(pkts);
}

View File

@ -24,7 +24,7 @@ namespace libuv
Callback callback;
};
Loop();
Loop(size_t queue_size);
bool
init() override;

View File

@ -65,7 +65,7 @@ namespace llarp
}
void
BaseSession::BlacklistSnode(const RouterID snode)
BaseSession::BlacklistSNode(const RouterID snode)
{
m_SnodeBlacklist.insert(std::move(snode));
}
@ -99,7 +99,7 @@ namespace llarp
bool
BaseSession::CheckPathDead(path::Path_ptr, llarp_time_t dlt)
{
return dlt >= 10s;
return dlt >= path::alive_timeout;
}
void
@ -359,5 +359,25 @@ namespace llarp
{
return "Exit::" + m_ExitRouter.ToString();
}
void
SNodeSession::SendPacketToRemote(const llarp_buffer_t& buf)
{
net::IPPacket pkt;
if (not pkt.Load(buf))
return;
pkt.ZeroAddresses();
QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize);
}
void
ExitSession::SendPacketToRemote(const llarp_buffer_t& buf)
{
net::IPPacket pkt;
if (not pkt.Load(buf))
return;
pkt.ZeroSourceAddress();
QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize);
}
} // namespace exit
} // namespace llarp

View File

@ -45,7 +45,7 @@ namespace llarp
}
void
BlacklistSnode(const RouterID snode);
BlacklistSNode(const RouterID snode) override;
util::StatusObject
ExtractStatus() const;
@ -187,6 +187,9 @@ namespace llarp
std::string
Name() const override;
virtual void
SendPacketToRemote(const llarp_buffer_t& pkt) override;
protected:
void
PopulateRequest(llarp::routing::ObtainExitMessage& msg) const override
@ -213,6 +216,9 @@ namespace llarp
std::string
Name() const override;
virtual void
SendPacketToRemote(const llarp_buffer_t& pkt) override;
protected:
void
PopulateRequest(llarp::routing::ObtainExitMessage& msg) const override

View File

@ -33,6 +33,9 @@ namespace llarp
{
return false;
}
void
SendPacketToRemote(const llarp_buffer_t&) override{};
};
} // namespace handlers
} // namespace llarp

View File

@ -13,6 +13,7 @@
#include <ev/ev.hpp>
#include <router/abstractrouter.hpp>
#include <service/context.hpp>
#include <service/endpoint_state.hpp>
#include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <nodedb.hpp>
@ -26,7 +27,7 @@ namespace llarp
void
TunEndpoint::FlushToUser(std::function<bool(net::IPPacket&)> send)
{
m_ExitMap.ForEachValue([](const auto& exit) { exit->FlushDownstream(); });
m_ExitMap.ForEachValue([r = Router()](const auto& exit) { exit->DownstreamFlush(r); });
// flush network to user
m_NetworkToUserPktQueue.Process(send);
}
@ -42,12 +43,7 @@ namespace llarp
TunEndpoint::tunifTick(llarp_tun_io* tun)
{
auto* self = static_cast<TunEndpoint*>(tun->user);
const auto now = self->Now();
if (self->ShouldFlushNow(now))
{
self->m_LastFlushAt = now;
LogicCall(self->m_router->logic(), [self]() { self->Flush(); });
}
self->Flush();
}
TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent, bool lazyVPN)
@ -168,86 +164,12 @@ namespace llarp
}
*/
#ifdef LOKINET_EXITS
if (not conf.m_exitNode.empty())
{
IPRange exitRange;
llarp::RouterID exitRouter;
std::string routerStr;
const auto pos = conf.m_exitNode.find(",");
if (pos != std::string::npos)
{
auto range_str = conf.m_exitNode.substr(1 + pos);
if (!exitRange.FromString(range_str))
{
LogError("bad exit range: '", range_str, "'");
return false;
}
routerStr = conf.m_exitNode.substr(0, pos);
}
else
{
routerStr = conf.m_exitNode;
}
routerStr = TrimWhitespace(routerStr);
if (!(exitRouter.FromString(routerStr)
|| HexDecode(routerStr.c_str(), exitRouter.begin(), exitRouter.size())))
{
llarp::LogError(Name(), " bad exit router key: ", routerStr);
return false;
}
auto exit = std::make_shared<llarp::exit::ExitSession>(
exitRouter,
util::memFn(&TunEndpoint::QueueInboundPacketForExit, this),
m_router,
numPaths,
numHops,
ShouldBundleRC());
m_ExitMap.Insert(exitRange, exit);
llarp::LogInfo(Name(), " using exit at ", exitRouter, " for ", exitRange);
}
#endif
m_LocalResolverAddr = dnsConf.m_bind;
m_UpstreamResolvers = dnsConf.m_upstreamDNS;
if (not conf.m_mapAddr.empty())
for (const auto& item : conf.m_mapAddrs)
{
auto pos = conf.m_mapAddr.find(":");
if (pos == std::string::npos)
{
llarp::LogError(
"Cannot map address ",
conf.m_mapAddr,
" invalid format, missing colon (:), expects "
"address.loki:ip.address.goes.here");
return false;
}
service::Address addr;
auto addr_str = conf.m_mapAddr.substr(0, pos);
if (!addr.FromString(addr_str))
{
llarp::LogError(Name() + " cannot map invalid address ", addr_str);
return false;
}
auto ip_str = conf.m_mapAddr.substr(pos + 1);
huint32_t ip;
huint128_t ipv6;
if (ip.FromString(ip_str))
{
ipv6 = net::ExpandV4(ip);
}
else if (ipv6.FromString(ip_str))
{
}
else
{
llarp::LogError(Name(), "failed to map ", ip_str, " failed to parse IP");
return false;
}
if (not MapAddress(addr, ipv6, false))
if (not MapAddress(item.second, item.first, false))
return false;
}
@ -315,19 +237,9 @@ namespace llarp
void
TunEndpoint::Flush()
{
static const auto func = [](auto self) {
self->FlushSend();
self->m_ExitMap.ForEachValue([](const auto& exit) { exit->FlushUpstream(); });
self->Pump(self->Now());
};
if (NetworkIsIsolated())
{
LogicCall(RouterLogic(), std::bind(func, shared_from_this()));
}
else
{
func(this);
}
FlushSend();
m_ExitMap.ForEachValue([r = Router()](const auto& exit) { exit->UpstreamFlush(r); });
Pump(Now());
}
static bool
@ -659,7 +571,7 @@ namespace llarp
const auto blacklist = SnodeBlacklist();
m_ExitMap.ForEachValue([blacklist](const auto& exit) {
for (const auto& snode : blacklist)
exit->BlacklistSnode(snode);
exit->BlacklistSNode(snode);
});
return SetupNetworking();
}
@ -844,10 +756,7 @@ namespace llarp
void
TunEndpoint::Tick(llarp_time_t now)
{
m_ExitMap.ForEachValue([&](const auto& exit) {
this->EnsureRouterIsKnown(exit->Endpoint());
exit->Tick(now);
});
m_ExitMap.ForEachValue([&](const auto& exit) { exit->Tick(now); });
Endpoint::Tick(now);
}
@ -864,29 +773,41 @@ namespace llarp
m_UserToNetworkPktQueue.Process([&](net::IPPacket& pkt) {
std::function<bool(const llarp_buffer_t&)> sendFunc;
huint128_t dst;
huint128_t dst, src;
if (pkt.IsV4())
dst = net::ExpandV4(pkt.dstv4());
{
dst = pkt.dst4to6();
src = pkt.src4to6();
}
else
{
dst = pkt.dstv6();
src = pkt.srcv6();
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
{
const auto exits = m_ExitMap.FindAll(dst);
for (const auto& exit : exits)
if (IsBogon(dst) or not m_state->m_ExitNode.has_value())
{
if (pkt.IsV4() && !llarp::IsIPv4Bogon(pkt.dstv4()))
// send icmp unreachable
const auto icmp = pkt.MakeICMPUnreachable();
if (icmp.has_value())
{
pkt.UpdateIPv4Address({0}, xhtonl(pkt.dstv4()));
exit->QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize);
}
else if (pkt.IsV6())
{
pkt.UpdateIPv6Address({0}, pkt.dstv6());
exit->QueueUpstreamTraffic(std::move(pkt), llarp::routing::ExitPadSize);
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src);
}
}
else
{
pkt.ZeroSourceAddress();
MarkAddressOutbound(*m_state->m_ExitNode);
EnsurePathToService(
*m_state->m_ExitNode,
[pkt, self = this](service::Address, service::OutboundContext*) {
self->SendToServiceOrQueue(
*self->m_state->m_ExitNode, pkt.ConstBuffer(), service::eProtocolExit);
},
1s);
}
return;
}
if (m_SNodes.at(itr->second))
@ -897,6 +818,15 @@ namespace llarp
itr->second.as_array(),
std::placeholders::_1);
}
else if (m_state->m_ExitEnabled)
{
sendFunc = std::bind(
&TunEndpoint::SendToServiceOrQueue,
this,
service::Address(itr->second.as_array()),
std::placeholders::_1,
service::eProtocolExit);
}
else
{
sendFunc = std::bind(
@ -908,11 +838,13 @@ namespace llarp
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
if (not m_state->m_ExitEnabled)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
if (sendFunc && sendFunc(pkt.Buffer()))
{
MarkIPActive(dst);
@ -923,38 +855,65 @@ namespace llarp
}
bool
TunEndpoint::HandleWriteIPPacket(
const llarp_buffer_t& b, std::function<huint128_t(void)> getFromIP)
TunEndpoint::HandleInboundPacket(
const service::ConvoTag tag, const llarp_buffer_t& buf, service::ProtocolType t)
{
if (t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6
&& t != service::eProtocolExit)
return false;
AlignedBuffer<32> addr;
bool snode = false;
if (!GetEndpointWithConvoTag(tag, addr, snode))
return false;
huint128_t src, dst;
net::IPPacket pkt;
if (not pkt.Load(buf))
return false;
if (m_state->m_ExitNode == service::Address{addr.as_array()} and t == service::eProtocolExit)
{
// client side from exit
if (pkt.IsV4())
src = pkt.src4to6();
else if (pkt.IsV6())
src = pkt.srcv6();
dst = m_OurIP;
}
else if (m_state->m_ExitEnabled)
{
// exit side from exit
src = ObtainIPForAddr(addr, snode);
if (pkt.IsV4())
dst = pkt.dst4to6();
else if (pkt.IsV6())
dst = pkt.dstv6();
}
else
{
// snapp traffic
src = ObtainIPForAddr(addr, snode);
dst = m_OurIP;
}
HandleWriteIPPacket(buf, src, dst);
return true;
}
bool
TunEndpoint::HandleWriteIPPacket(const llarp_buffer_t& b, huint128_t src, huint128_t dst)
{
// llarp::LogInfo("got packet from ", msg->sender.Addr());
auto themIP = getFromIP();
// llarp::LogInfo("themIP ", themIP);
auto usIP = m_OurIP;
ManagedBuffer buf(b);
return m_NetworkToUserPktQueue.EmplaceIf([buf, themIP, usIP](net::IPPacket& pkt) -> bool {
return m_NetworkToUserPktQueue.EmplaceIf([buf, src, dst](net::IPPacket& pkt) -> bool {
// load
if (!pkt.Load(buf))
return false;
// filter out:
// - packets smaller than minimal IPv4 header
// - non-IPv4 packets
// - packets with weird src/dst addresses
// (0.0.0.0/8 but not 0.0.0.0)
// - packets with 0 src but non-0 dst and oposite
if (pkt.IsV4())
{
auto hdr = pkt.Header();
if (pkt.sz < sizeof(*hdr) || (hdr->saddr != 0 && *(byte_t*)&(hdr->saddr) == 0)
|| (hdr->daddr != 0 && *(byte_t*)&(hdr->daddr) == 0)
|| ((hdr->saddr == 0) != (hdr->daddr == 0)))
{
return false;
}
pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(themIP)), xhtonl(net::TruncateV6(usIP)));
pkt.UpdateIPv4Address(xhtonl(net::TruncateV6(src)), xhtonl(net::TruncateV6(dst)));
}
else if (pkt.IsV6())
{
pkt.UpdateIPv6Address(themIP, usIP);
pkt.UpdateIPv6Address(src, dst);
}
return true;
});
@ -1060,34 +1019,21 @@ namespace llarp
{
// called in the isolated network thread
auto* self = static_cast<TunEndpoint*>(tun->user);
auto _pkts = std::move(self->m_TunPkts);
self->m_TunPkts = std::vector<net::IPPacket>();
LogicCall(self->EndpointLogic(), [tun, self, pkts = std::move(_pkts)]() {
for (auto& pkt : pkts)
self->FlushToUser([self, tun](net::IPPacket& pkt) -> bool {
if (not llarp_ev_tun_async_write(tun, pkt.Buffer()))
{
self->m_UserToNetworkPktQueue.Emplace(pkt);
llarp::LogWarn(self->Name(), " packet dropped");
}
self->FlushToUser([self, tun](net::IPPacket& pkt) -> bool {
if (!llarp_ev_tun_async_write(tun, pkt.Buffer()))
{
llarp::LogWarn(self->Name(), " packet dropped");
return true;
}
return false;
});
return false;
});
}
} // namespace handlers
void
TunEndpoint::tunifRecvPkt(llarp_tun_io* tun, const llarp_buffer_t& b)
{
// called for every packet read from user in isolated network thread
auto* self = static_cast<TunEndpoint*>(tun->user);
net::IPPacket pkt;
if (not pkt.Load(b))
return;
self->m_TunPkts.emplace_back(pkt);
self->m_UserToNetworkPktQueue.EmplaceIf([&](net::IPPacket& pkt) { return pkt.Load(b); });
}
TunEndpoint::~TunEndpoint() = default;

View File

@ -33,6 +33,9 @@ namespace llarp
bool
Configure(const NetworkConfig& conf, const DnsConfig& dnsConf) override;
void
SendPacketToRemote(const llarp_buffer_t&) override{};
void
Tick(llarp_time_t now) override;
@ -81,21 +84,11 @@ namespace llarp
/// overrides Endpoint
bool
HandleInboundPacket(
const service::ConvoTag tag, const llarp_buffer_t& pkt, service::ProtocolType t) override
{
if (t != service::eProtocolTrafficV4 && t != service::eProtocolTrafficV6)
return false;
AlignedBuffer<32> addr;
bool snode = false;
if (!GetEndpointWithConvoTag(tag, addr, snode))
return false;
return HandleWriteIPPacket(
pkt, [=]() -> huint128_t { return ObtainIPForAddr(addr, snode); });
}
const service::ConvoTag tag, const llarp_buffer_t& pkt, service::ProtocolType t) override;
/// handle inbound traffic
bool
HandleWriteIPPacket(const llarp_buffer_t& buf, std::function<huint128_t(void)> getFromIP);
HandleWriteIPPacket(const llarp_buffer_t& buf, huint128_t src, huint128_t dst);
/// queue outbound packet to the world
bool
@ -192,8 +185,6 @@ namespace llarp
net::IPPacket::CompareOrder,
net::IPPacket::GetNow>;
/// queue packet for send on net thread from user
std::vector<net::IPPacket> m_TunPkts;
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;
/// queue for sending packets to user from network
@ -232,35 +223,6 @@ namespace llarp
return nullptr;
}
bool
QueueInboundPacketForExit(const llarp_buffer_t& buf)
{
ManagedBuffer copy{buf};
return m_NetworkToUserPktQueue.EmplaceIf([&](llarp::net::IPPacket& pkt) -> bool {
if (!pkt.Load(copy.underlying))
return false;
if (SupportsV6())
{
if (pkt.IsV4())
{
pkt.UpdateIPv6Address(net::ExpandV4(pkt.srcv4()), m_OurIP);
}
else
{
pkt.UpdateIPv6Address(pkt.srcv6(), m_OurIP);
}
}
else
{
if (pkt.IsV4())
pkt.UpdateIPv4Address(xhtonl(pkt.srcv4()), xhtonl(net::TruncateV6(m_OurIP)));
else
return false;
}
return true;
});
}
template <typename Addr_t, typename Endpoint_t>
void
SendDNSReply(

View File

@ -19,6 +19,7 @@ namespace llarp
{
const llarp_buffer_t buf(m_Data);
CryptoManager::instance()->shorthash(m_Digest, buf);
m_Acks.set(0);
}
ILinkSession::Packet_t

View File

@ -143,10 +143,8 @@ namespace llarp
Session::EncryptWorker(CryptoQueue_ptr msgs)
{
LogDebug("encrypt worker ", msgs->size(), " messages");
auto itr = msgs->begin();
while (itr != msgs->end())
for (auto& pkt : *msgs)
{
Packet_t pkt = std::move(*itr);
llarp_buffer_t pktbuf(pkt);
const TunnelNonce nonce_ptr{pkt.data() + HMACSIZE};
pktbuf.base += PacketOverhead;
@ -157,7 +155,6 @@ namespace llarp
pktbuf.sz = pkt.size() - HMACSIZE;
CryptoManager::instance()->hmac(pkt.data(), pktbuf, m_SessionKey);
Send_LL(pkt.data(), pkt.size());
++itr;
}
}
@ -615,8 +612,9 @@ namespace llarp
recvMsgs->emplace_back(std::move(pkt));
}
LogDebug("decrypted ", recvMsgs->size(), " packets from ", m_RemoteAddr);
LogicCall(
m_Parent->logic(), std::bind(&Session::HandlePlaintext, shared_from_this(), recvMsgs));
LogicCall(m_Parent->logic(), [self = shared_from_this(), msgs = recvMsgs] {
self->HandlePlaintext(std::move(msgs));
});
}
void
@ -756,6 +754,7 @@ namespace llarp
{
return;
}
if (not itr->second.Verify())
{
LogError("bad short xmit hash from ", m_RemoteAddr);
@ -911,11 +910,6 @@ namespace llarp
return false;
}
}
else
{
// this case should never happen
::abort();
}
break;
case State::Introduction:
if (m_Inbound)

View File

@ -204,7 +204,7 @@ namespace llarp
/// set of rx messages to send in next round of multiacks
std::unordered_set<uint64_t> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;
using CryptoQueue_t = std::list<Packet_t>;
using CryptoQueue_ptr = std::shared_ptr<CryptoQueue_t>;
CryptoQueue_ptr m_EncryptNext;
CryptoQueue_ptr m_DecryptNext;

View File

@ -116,7 +116,13 @@ namespace llarp
{
m_Loop = loop;
m_udp.user = this;
m_udp.recvfrom = nullptr;
m_udp.recvfrom = [](llarp_udp_io* udp, const llarp::SockAddr& from, ManagedBuffer pktbuf) {
ILinkSession::Packet_t pkt;
auto& buf = pktbuf.underlying;
pkt.resize(buf.sz);
std::copy_n(buf.base, buf.sz, pkt.data());
static_cast<ILinkLayer*>(udp->user)->RecvFrom(from, std::move(pkt));
};
m_udp.tick = &ILinkLayer::udp_tick;
if (ifname == "*")
{
@ -495,23 +501,7 @@ namespace llarp
ILinkLayer::udp_tick(llarp_udp_io* udp)
{
ILinkLayer* link = static_cast<ILinkLayer*>(udp->user);
auto pkts = std::make_shared<llarp_pkt_list>();
llarp_ev_udp_recvmany(&link->m_udp, pkts.get());
auto logic = link->logic();
if (logic == nullptr)
return;
LogicCall(logic, [pkts, link]() {
auto itr = pkts->begin();
while (itr != pkts->end())
{
if (link->m_RecentlyClosed.find(itr->remote) == link->m_RecentlyClosed.end())
{
link->RecvFrom(itr->remote, std::move(itr->pkt));
}
++itr;
}
link->Pump();
});
link->Pump();
}
} // namespace llarp

View File

@ -45,7 +45,7 @@ namespace llarp
/// message delivery result hook function
using CompletionHandler = std::function<void(DeliveryStatus)>;
using Packet_t = PacketBuffer;
using Packet_t = std::vector<byte_t>;
using Message_t = std::vector<byte_t>;
/// send a message buffer to the remote endpoint

View File

@ -9,6 +9,11 @@ namespace llarp
setAddress(str);
}
IpAddress::IpAddress(const IpAddress& other)
: m_empty(other.m_empty), m_ipAddress(other.m_ipAddress), m_port(other.m_port)
{
}
IpAddress::IpAddress(std::string_view str, std::optional<uint16_t> port)
{
setAddress(str, port);
@ -24,6 +29,16 @@ namespace llarp
m_empty = addr.isEmpty();
}
IpAddress&
IpAddress::operator=(IpAddress&& other)
{
m_ipAddress = std::move(other.m_ipAddress);
m_port = std::move(other.m_port);
m_empty = other.m_empty;
other.m_empty = false;
return *this;
}
IpAddress&
IpAddress::operator=(const sockaddr& other)
{
@ -38,6 +53,14 @@ namespace llarp
return *this;
}
IpAddress&
IpAddress::operator=(const IpAddress& other)
{
m_empty = other.m_empty;
m_ipAddress = other.m_ipAddress;
m_port = other.m_port;
return *this;
}
std::optional<uint16_t>
IpAddress::getPort() const

View File

@ -21,6 +21,10 @@ namespace llarp
{
/// Empty constructor.
IpAddress() = default;
/// move construtor
IpAddress(IpAddress&&) = default;
/// copy construct
IpAddress(const IpAddress&);
/// Constructor. Takes a string which can be an IPv4 or IPv6 address optionally followed by
/// a colon and a port.
@ -51,6 +55,14 @@ namespace llarp
IpAddress&
operator=(const sockaddr& other);
/// move assignment
IpAddress&
operator=(IpAddress&& other);
/// copy assignment
IpAddress&
operator=(const IpAddress& other);
/// Return the port. Returns -1 if no port has been provided.
///
/// @return the port, if present
@ -119,19 +131,7 @@ namespace llarp
std::size_t
operator()(const IpAddress& address) const noexcept
{
(void)address;
// throw std::runtime_error("FIXME: IpAddress::Hash"); // can't do this in operator(),
// apparently, so hopefully it's me that stumbles upon this (if not, sorry!)
return 0;
/*
if(a.af() == AF_INET)
{
return a.port() ^ a.addr4()->s_addr;
}
static const uint8_t empty[16] = {0};
return (a.af() + memcmp(a.addr6(), empty, 16)) ^ a.port();
*/
return std::hash<std::string>{}(address.toString());
}
};

View File

@ -84,32 +84,32 @@ namespace llarp
return huint32_t{ntohl(Header()->daddr)};
}
#if 0
static uint32_t
ipchksum_pseudoIPv4(nuint32_t src_ip, nuint32_t dst_ip, uint8_t proto,
uint16_t innerlen)
huint128_t
IPPacket::dst4to6() const
{
#define IPCS(x) ((uint32_t)(x & 0xFFff) + (uint32_t)(x >> 16))
uint32_t sum = IPCS(src_ip.n) + IPCS(dst_ip.n) + (uint32_t)proto
+ (uint32_t)htons(innerlen);
#undef IPCS
return sum;
return ExpandV4(dstv4());
}
huint128_t
IPPacket::src4to6() const
{
return ExpandV4(srcv4());
}
static uint16_t
ipchksum(const byte_t *buf, size_t sz, uint32_t sum = 0)
ipchksum(const byte_t* buf, size_t sz, uint32_t sum = 0)
{
while(sz > 1)
while (sz > 1)
{
sum += *(const uint16_t *)buf;
sum += *(const uint16_t*)buf;
sz -= sizeof(uint16_t);
buf += sizeof(uint16_t);
}
if(sz != 0)
if (sz != 0)
{
uint16_t x = 0;
*(byte_t *)&x = *(const byte_t *)buf;
*(byte_t*)&x = *(const byte_t*)buf;
sum += x;
}
@ -120,7 +120,6 @@ namespace llarp
return uint16_t((~sum) & 0xFFff);
}
#endif
#define ADD32CS(x) ((uint32_t)(x & 0xFFff) + (uint32_t)(x >> 16))
#define SUB32CS(x) ((uint32_t)((~x) & 0xFFff) + (uint32_t)((~x) >> 16))
@ -419,5 +418,83 @@ namespace llarp
break;
}
}
void
IPPacket::ZeroAddresses()
{
if (IsV4())
{
UpdateIPv4Address({0}, {0});
}
else if (IsV6())
{
UpdateIPv6Address({0}, {0});
}
}
void
IPPacket::ZeroSourceAddress()
{
if (IsV4())
{
UpdateIPv4Address({0}, xhtonl(dstv4()));
}
else if (IsV6())
{
UpdateIPv6Address({0}, {ntoh128(dstv6().h)});
}
}
std::optional<IPPacket>
IPPacket::MakeICMPUnreachable() const
{
if (IsV4())
{
constexpr auto icmp_Header_size = 8;
constexpr auto ip_Header_size = 20;
net::IPPacket pkt{};
auto* pkt_Header = pkt.Header();
pkt_Header->version = 4;
pkt_Header->ihl = 0x05;
pkt_Header->tos = 0;
pkt_Header->check = 0;
pkt_Header->tot_len = ntohs(icmp_Header_size + ip_Header_size);
pkt_Header->saddr = Header()->daddr;
pkt_Header->daddr = Header()->saddr;
pkt_Header->protocol = 1; // ICMP
pkt_Header->ttl = 1;
pkt_Header->frag_off = htons(0b0100000000000000);
// size pf ip header
const size_t l3_HeaderSize = Header()->ihl * 4;
// size of l4 packet to reflect back
const size_t l4_PacketSize = 8;
pkt_Header->tot_len += ntohs(l4_PacketSize + l3_HeaderSize);
uint16_t* checksum;
uint8_t* itr = pkt.buf + (pkt_Header->ihl * 4);
uint8_t* icmp_begin = itr; // type 'destination unreachable'
*itr++ = 3;
// code 'Destination host unknown error'
*itr++ = 7;
// checksum + unused
htobe32buf(itr, 0);
checksum = (uint16_t*)itr;
itr += 4;
// next hop mtu is ignored but let's put something here anyways just in case tm
htobe16buf(itr, 1500);
itr += 2;
// copy ip header and first 8 bytes of datagram for icmp rject
std::copy_n(buf, l4_PacketSize + l3_HeaderSize, itr);
itr += l4_PacketSize + l3_HeaderSize;
// calculate checksum of ip header
pkt_Header->check = ipchksum(pkt.buf, pkt_Header->ihl * 4);
const auto icmp_size = std::distance(icmp_begin, itr);
// calculate icmp checksum
*checksum = ipchksum(icmp_begin, icmp_size);
pkt.sz = ntohs(pkt_Header->tot_len);
return pkt;
}
return std::nullopt;
}
} // namespace net
} // namespace llarp

View File

@ -241,6 +241,17 @@ namespace llarp
void
UpdateIPv6Address(huint128_t src, huint128_t dst);
/// set addresses to zero and recacluate checksums
void
ZeroAddresses();
/// zero out source address
void
ZeroSourceAddress();
/// make an icmp unreachable reply packet based of this ip packet
std::optional<IPPacket>
MakeICMPUnreachable() const;
};
} // namespace net

View File

@ -596,6 +596,15 @@ namespace llarp
#endif
}
bool
IsBogon(const huint128_t ip)
{
const nuint128_t netIP{ntoh128(ip.h)};
in6_addr addr{};
std::copy_n((const uint8_t*)&netIP.n, 16, &addr.s6_addr[0]);
return IsBogon(addr);
}
bool
IsBogonRange(const in6_addr& host, const in6_addr&)
{

View File

@ -94,6 +94,9 @@ namespace llarp
bool
IsBogon(const in6_addr& addr);
bool
IsBogon(const huint128_t addr);
bool
IsBogonRange(const in6_addr& host, const in6_addr& mask);

View File

@ -8,6 +8,8 @@ namespace llarp
bool
IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*)
{
if (not m_UpstreamReplayFilter.Insert(Y))
return false;
if (m_UpstreamQueue == nullptr)
m_UpstreamQueue = std::make_shared<TrafficQueue_t>();
m_UpstreamQueue->emplace_back();
@ -22,6 +24,8 @@ namespace llarp
bool
IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*)
{
if (not m_DownstreamReplayFilter.Insert(Y))
return false;
if (m_DownstreamQueue == nullptr)
m_DownstreamQueue = std::make_shared<TrafficQueue_t>();
m_DownstreamQueue->emplace_back();
@ -31,5 +35,12 @@ namespace llarp
pkt.second = Y;
return true;
}
void
IHopHandler::DecayFilters(llarp_time_t now)
{
m_UpstreamReplayFilter.Decay(now);
m_DownstreamReplayFilter.Decay(now);
}
} // namespace path
} // namespace llarp

View File

@ -4,6 +4,7 @@
#include <crypto/types.hpp>
#include <util/types.hpp>
#include <crypto/encrypted_frame.hpp>
#include <util/decaying_hashset.hpp>
#include <messages/relay.hpp>
#include <vector>
@ -25,11 +26,14 @@ namespace llarp
struct IHopHandler
{
using TrafficEvent_t = std::pair<std::vector<byte_t>, TunnelNonce>;
using TrafficQueue_t = std::vector<TrafficEvent_t>;
using TrafficQueue_t = std::list<TrafficEvent_t>;
using TrafficQueue_ptr = std::shared_ptr<TrafficQueue_t>;
virtual ~IHopHandler() = default;
void
DecayFilters(llarp_time_t now);
virtual bool
Expired(llarp_time_t now) const = 0;
@ -70,6 +74,8 @@ namespace llarp
uint64_t m_SequenceNum = 0;
TrafficQueue_ptr m_UpstreamQueue;
TrafficQueue_ptr m_DownstreamQueue;
util::DecayingHashSet<TunnelNonce> m_UpstreamReplayFilter;
util::DecayingHashSet<TunnelNonce> m_DownstreamReplayFilter;
virtual void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;

View File

@ -58,19 +58,6 @@ namespace llarp
EnterState(ePathBuilding, parent->Now());
}
bool
Path::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
return m_UpstreamReplayFilter.Insert(Y) and IHopHandler::HandleUpstream(X, Y, r);
}
bool
Path::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
return m_DownstreamReplayFilter.Insert(Y) and IHopHandler::HandleDownstream(X, Y, r);
}
void
Path::SetBuildResultHook(BuildResultHookFunc func)
{
@ -372,9 +359,6 @@ namespace llarp
m_RXRate = 0;
m_TXRate = 0;
m_UpstreamReplayFilter.Decay(now);
m_DownstreamReplayFilter.Decay(now);
if (_status == ePathBuilding)
{
if (buildStarted == 0s)
@ -394,9 +378,12 @@ namespace llarp
// check to see if this path is dead
if (_status == ePathEstablished)
{
const auto dlt = now - m_LastLatencyTestTime;
auto dlt = now - m_LastLatencyTestTime;
if (dlt > path::latency_interval && m_LastLatencyTestID == 0)
{
// bail doing test if we are active
if (now - m_LastRecvMessage < path::latency_interval)
return;
routing::PathLatencyMessage latency;
latency.T = randint();
m_LastLatencyTestID = latency.T;
@ -405,24 +392,12 @@ namespace llarp
FlushUpstream(r);
return;
}
if (m_LastRecvMessage > 0s && now > m_LastRecvMessage)
dlt = now - m_LastRecvMessage;
if (dlt >= path::alive_timeout)
{
const auto delay = now - m_LastRecvMessage;
if (m_CheckForDead && m_CheckForDead(shared_from_this(), delay))
{
LogWarn(Name(), " waited for ", dlt, " and path is unresponsive");
r->routerProfiling().MarkPathFail(this);
EnterState(ePathTimeout, now);
}
}
else if (dlt >= path::alive_timeout && m_LastRecvMessage == 0s)
{
if (m_CheckForDead && m_CheckForDead(shared_from_this(), dlt))
{
LogWarn(Name(), " waited for ", dlt, " and path looks dead");
r->routerProfiling().MarkPathFail(this);
EnterState(ePathTimeout, now);
}
LogWarn(Name(), " waited for ", dlt, " and path looks dead");
r->routerProfiling().MarkPathFail(this);
EnterState(ePathTimeout, now);
}
}
}
@ -464,31 +439,33 @@ namespace llarp
msg.pathid = TXID();
++idx;
}
LogicCall(
r->logic(),
std::bind(&Path::HandleAllUpstream, shared_from_this(), std::move(sendmsgs), r));
LogicCall(r->logic(), [self = shared_from_this(), data = std::move(sendmsgs), r]() {
self->HandleAllUpstream(std::move(data), r);
});
}
void
Path::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && !m_UpstreamQueue->empty())
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_UpstreamQueue, data);
r->threadpool()->addJob(
std::bind(&Path::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r));
[self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); });
}
m_UpstreamQueue = nullptr;
}
void
Path::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && !m_DownstreamQueue->empty())
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_DownstreamQueue, data);
r->threadpool()->addJob(
std::bind(&Path::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r));
[self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); });
}
m_DownstreamQueue = nullptr;
}
bool
@ -532,9 +509,9 @@ namespace llarp
sendMsgs[idx].X = buf;
++idx;
}
LogicCall(
r->logic(),
std::bind(&Path::HandleAllDownstream, shared_from_this(), std::move(sendMsgs), r));
LogicCall(r->logic(), [self = shared_from_this(), msgs = std::move(sendMsgs), r]() {
self->HandleAllDownstream(std::move(msgs), r);
});
}
void

View File

@ -26,8 +26,6 @@
#include <unordered_set>
#include <vector>
#include <util/decaying_hashset.hpp>
namespace llarp
{
class Logic;
@ -282,11 +280,6 @@ namespace llarp
void
Rebuild();
bool
HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) override;
bool
HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter*) override;
void
Tick(llarp_time_t now, AbstractRouter* r);
@ -420,8 +413,6 @@ namespace llarp
uint64_t m_ExitObtainTX = 0;
PathStatus _status;
PathRole _role;
util::DecayingHashSet<TunnelNonce> m_UpstreamReplayFilter;
util::DecayingHashSet<TunnelNonce> m_DownstreamReplayFilter;
uint64_t m_LastRXRate = 0;
uint64_t m_RXRate = 0;
uint64_t m_LastTXRate = 0;

View File

@ -314,7 +314,10 @@ namespace llarp
itr = map.erase(itr);
}
else
{
itr->second->DecayFilters(now);
++itr;
}
}
}
{
@ -328,7 +331,10 @@ namespace llarp
itr = map.erase(itr);
}
else
{
itr->second->DecayFilters(now);
++itr;
}
}
}
}

View File

@ -5,6 +5,8 @@
#include <routing/dht_message.hpp>
#include <router/abstractrouter.hpp>
#include <random>
namespace llarp
{
namespace path
@ -161,6 +163,33 @@ namespace llarp
return chosen;
}
Path_ptr
PathSet::GetRandomPathByRouter(RouterID id, PathRole roles) const
{
Lock_t l(m_PathsMutex);
std::vector<Path_ptr> chosen;
auto itr = m_Paths.begin();
while (itr != m_Paths.end())
{
if (itr->second->IsReady() && itr->second->SupportsAnyRoles(roles))
{
if (itr->second->Endpoint() == id)
{
chosen.emplace_back(itr->second);
}
}
++itr;
}
if (chosen.empty())
return nullptr;
size_t idx = 0;
if (chosen.size() >= 2)
{
idx = rand() % chosen.size();
}
return chosen[idx];
}
Path_ptr
PathSet::GetByEndpointWithID(RouterID ep, PathID_t id) const
{

View File

@ -197,6 +197,9 @@ namespace llarp
return false;
}
virtual void
BlacklistSNode(const RouterID) = 0;
/// override me in subtype
virtual bool HandleGotIntroMessage(std::shared_ptr<const dht::GotIntroMessage>)
{
@ -227,6 +230,9 @@ namespace llarp
Path_ptr
GetNewestPathByRouter(RouterID router, PathRole roles = ePathRoleAny) const;
Path_ptr
GetRandomPathByRouter(RouterID router, PathRole roles = ePathRoleAny) const;
Path_ptr
GetPathByID(PathID_t id) const;
@ -262,6 +268,9 @@ namespace llarp
virtual bool
BuildOneAlignedTo(const RouterID endpoint) = 0;
virtual void
SendPacketToRemote(const llarp_buffer_t& pkt) = 0;
void
ForEachPath(std::function<void(const Path_ptr&)> visit) const
{

View File

@ -32,7 +32,8 @@ namespace llarp
return stream;
}
TransitHop::TransitHop() : m_UpstreamGather(128), m_DownstreamGather(128)
TransitHop::TransitHop()
: m_UpstreamGather(transit_hop_queue_size), m_DownstreamGather(transit_hop_queue_size)
{
m_UpstreamGather.enable();
m_DownstreamGather.enable();
@ -119,7 +120,6 @@ namespace llarp
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
m_DownstreamWorkCounter++;
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
do
@ -161,7 +161,6 @@ namespace llarp
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
m_UpstreamWorkCounter++;
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayUpstreamMessage> msgs;
do
@ -210,7 +209,7 @@ namespace llarp
FlushDownstream(r);
for (const auto& other : m_FlushOthers)
{
other->FlushUpstream(r);
other->FlushDownstream(r);
}
m_FlushOthers.clear();
}
@ -251,19 +250,28 @@ namespace llarp
void
TransitHop::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && !m_UpstreamQueue->empty())
r->threadpool()->addJob(std::bind(
&TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r));
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
{
if (r->threadpool()->addJob(std::bind(
&TransitHop::UpstreamWork, shared_from_this(), std::move(m_UpstreamQueue), r)))
{
m_UpstreamWorkCounter++;
}
}
m_UpstreamQueue = nullptr;
}
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && !m_DownstreamQueue->empty())
r->threadpool()->addJob(std::bind(
&TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r));
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
{
if (r->threadpool()->addJob(std::bind(
&TransitHop::DownstreamWork, shared_from_this(), std::move(m_DownstreamQueue), r)))
{
m_DownstreamWorkCounter++;
}
}
m_DownstreamQueue = nullptr;
}

View File

@ -24,9 +24,9 @@ namespace llarp
using SendStatusHandler = std::function<void(SendStatus)>;
static const size_t MAX_PATH_QUEUE_SIZE = 40;
static const size_t MAX_OUTBOUND_QUEUE_SIZE = 200;
static const size_t MAX_OUTBOUND_MESSAGES_PER_TICK = 20;
static const size_t MAX_PATH_QUEUE_SIZE = 100;
static const size_t MAX_OUTBOUND_QUEUE_SIZE = 1000;
static const size_t MAX_OUTBOUND_MESSAGES_PER_TICK = 500;
struct IOutboundMessageHandler
{

View File

@ -153,12 +153,11 @@ namespace llarp
void
Router::PumpLL()
{
static constexpr size_t PumpJobThreshhold = 50;
static constexpr auto PumpInterval = 25ms;
const auto now = Now();
if (_stopping.load())
return;
if (_logic->numPendingJobs() >= PumpJobThreshhold && _lastPump + PumpInterval >= now)
if (_lastPump + PumpInterval >= now)
{
return;
}
@ -174,11 +173,6 @@ namespace llarp
bool
Router::SendToOrQueue(const RouterID& remote, const ILinkMessage* msg, SendStatusHandler handler)
{
if (handler == nullptr)
{
using std::placeholders::_1;
handler = std::bind(&Router::MessageSent, this, remote, _1);
}
return _outboundMessageHandler.QueueMessage(remote, msg, handler);
}

View File

@ -613,12 +613,6 @@ namespace llarp
std::set<RouterID> exclude = prev;
for (const auto& snode : SnodeBlacklist())
exclude.insert(snode);
if (hop == 0)
{
const auto exits = GetExitRouters();
// exclude exit node as first hop in any paths
exclude.insert(exits.begin(), exits.end());
}
if (hop == numHops - 1)
{
// diversify endpoints
@ -633,13 +627,6 @@ namespace llarp
path::Builder::PathBuildStarted(path);
}
std::set<RouterID>
Endpoint::GetExitRouters() const
{
return m_ExitMap.TransformValues<RouterID>(
[](const exit::BaseSession_ptr& ptr) -> RouterID { return ptr->Endpoint(); });
}
void
Endpoint::PutNewOutboundContext(const service::IntroSet& introset)
{
@ -850,7 +837,9 @@ namespace llarp
bool
Endpoint::ProcessDataMessage(std::shared_ptr<ProtocolMessage> msg)
{
if (msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6)
if ((msg->proto == eProtocolExit
&& (m_state->m_ExitEnabled || msg->sender.Addr() == m_state->m_ExitNode))
|| msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6)
{
util::Lock l(m_state->m_InboundTrafficQueueMutex);
m_state->m_InboundTrafficQueue.emplace(msg);
@ -971,7 +960,6 @@ namespace llarp
static constexpr size_t NumParallelLookups = 2;
/// how many requests per router
static constexpr size_t RequestsPerLookup = 2;
LogInfo(Name(), " Ensure Path to ", remote.ToString());
MarkAddressOutbound(remote);
@ -1327,6 +1315,12 @@ namespace llarp
return m_state->m_Router;
}
void
Endpoint::BlacklistSNode(const RouterID snode)
{
m_state->m_SnodeBlacklist.insert(snode);
}
const std::set<RouterID>&
Endpoint::SnodeBlacklist() const
{

View File

@ -155,10 +155,6 @@ namespace llarp
std::string
Name() const override;
/// get a set of all the routers we use as exit node
std::set<RouterID>
GetExitRouters() const;
bool
ShouldPublishDescriptors(llarp_time_t now) const override;
@ -261,6 +257,9 @@ namespace llarp
return false;
}
void
BlacklistSNode(const RouterID snode) override;
/// return true if we have a convotag as an exit session
/// or as a hidden service session
/// set addr and issnode
@ -425,11 +424,12 @@ namespace llarp
protected:
IDataHandler* m_DataHandler = nullptr;
Identity m_Identity;
net::IPRangeMap<exit::BaseSession_ptr> m_ExitMap;
net::IPRangeMap<path::PathSet_ptr> m_ExitMap;
hooks::Backend_ptr m_OnUp;
hooks::Backend_ptr m_OnDown;
hooks::Backend_ptr m_OnReady;
bool m_PublishIntroSet = true;
std::unique_ptr<EndpointState> m_state;
private:
void
@ -445,8 +445,6 @@ namespace llarp
const ConvoMap& Sessions() const;
ConvoMap& Sessions();
// clang-format on
std::unique_ptr<EndpointState> m_state;
thread::Queue<RecvDataEvent> m_RecvQueue;
};

View File

@ -15,7 +15,8 @@ namespace llarp
{
m_Keyfile = conf.m_keyfile;
m_SnodeBlacklist = conf.m_snodeBlacklist;
m_ExitEnabled = conf.m_AllowExit;
m_ExitNode = conf.m_exitNode;
// TODO:
/*
if (k == "on-up")

View File

@ -52,6 +52,8 @@ namespace llarp
std::string m_Keyfile;
std::string m_Name;
std::string m_NetNS;
bool m_ExitEnabled = false;
std::optional<service::Address> m_ExitNode;
util::Mutex m_SendQueueMutex; // protects m_SendQueue
std::deque<SendEvent_t> m_SendQueue GUARDED_BY(m_SendQueueMutex);

View File

@ -54,7 +54,7 @@ namespace llarp
}
OutboundContext::OutboundContext(const IntroSet& introset, Endpoint* parent)
: path::Builder(parent->Router(), 4, path::default_len)
: path::Builder(parent->Router(), 4, parent->numHops)
, SendContext(introset.A, {}, this, parent)
, location(introset.A.Addr().ToKey())
, currentIntroSet(introset)
@ -271,6 +271,17 @@ namespace llarp
if (m_LookupFails > 16 || m_BuildFails > 10)
return true;
constexpr auto InboundTrafficTimeout = 5s;
if (m_GotInboundTraffic and m_LastInboundTraffic + InboundTrafficTimeout <= now)
{
if (std::chrono::abs(now - lastGoodSend) < InboundTrafficTimeout)
{
// timeout on other side
MarkCurrentIntroBad(now);
}
}
// check for expiration
if (remoteIntro.ExpiresSoon(now))
{
@ -332,12 +343,6 @@ namespace llarp
exclude.insert(m_NextIntro.router);
for (const auto& snode : m_Endpoint->SnodeBlacklist())
exclude.insert(snode);
if (hop == 0)
{
// exclude any exits as our first hop
const auto exits = m_Endpoint->GetExitRouters();
exclude.insert(exits.begin(), exits.end());
}
if (hop == numHops - 1)
{
m_Endpoint->EnsureRouterIsKnown(m_NextIntro.router);
@ -531,9 +536,17 @@ namespace llarp
bool
OutboundContext::HandleHiddenServiceFrame(path::Path_ptr p, const ProtocolFrame& frame)
{
m_LastInboundTraffic = m_Endpoint->Now();
m_GotInboundTraffic = true;
return m_Endpoint->HandleHiddenServiceFrame(p, frame);
}
void
OutboundContext::SendPacketToRemote(const llarp_buffer_t& buf)
{
AsyncEncryptAndSendTo(buf, eProtocolExit);
}
} // namespace service
} // namespace llarp

View File

@ -21,11 +21,15 @@ namespace llarp
public std::enable_shared_from_this<OutboundContext>
{
OutboundContext(const IntroSet& introSet, Endpoint* parent);
~OutboundContext() override;
util::StatusObject
ExtractStatus() const;
void
BlacklistSNode(const RouterID) override{};
bool
ShouldBundleRC() const override;
@ -67,6 +71,10 @@ namespace llarp
bool
ReadyToSend() const;
/// for exits
void
SendPacketToRemote(const llarp_buffer_t&) override;
bool
ShouldBuildMore(llarp_time_t now) const override;
@ -128,6 +136,8 @@ namespace llarp
llarp_time_t lastShift = 0s;
uint16_t m_LookupFails = 0;
uint16_t m_BuildFails = 0;
llarp_time_t m_LastInboundTraffic = 0s;
bool m_GotInboundTraffic = false;
};
} // namespace service

View File

@ -9,5 +9,5 @@ namespace llarp::service
constexpr ProtocolType eProtocolControl = 0UL;
constexpr ProtocolType eProtocolTrafficV4 = 1UL;
constexpr ProtocolType eProtocolTrafficV6 = 2UL;
constexpr ProtocolType eProtocolExit = 3UL;
} // namespace llarp::service

View File

@ -74,7 +74,7 @@ namespace llarp
f->T = currentConvoTag;
f->S = ++sequenceNo;
auto path = m_PathSet->GetNewestPathByRouter(remoteIntro.router);
auto path = m_PathSet->GetRandomPathByRouter(remoteIntro.router);
if (!path)
{
LogError(m_Endpoint->Name(), " cannot encrypt and send: no path for intro ", remoteIntro);

View File

@ -28,14 +28,9 @@ namespace llarp
{
/// aligned buffer that is sz bytes long and aligns to the nearest Alignment
template <size_t sz>
#ifdef _WIN32
// We CANNOT align on a 128-bit boundary, malloc(3C) on win32
// only hands out 64-bit aligned pointers
struct alignas(uint64_t) AlignedBuffer
#else
struct alignas(std::max_align_t) AlignedBuffer
#endif
{
static_assert(alignof(std::max_align_t) <= 16, "insane alignment");
static_assert(
sz >= 8,
"AlignedBuffer cannot be used with buffers smaller than 8 "
@ -276,12 +271,12 @@ namespace llarp
struct Hash
{
size_t
operator()(const AlignedBuffer& buf) const
std::size_t
operator()(const AlignedBuffer& buf) const noexcept
{
size_t hash;
std::memcpy(&hash, buf.data(), sizeof(hash));
return hash;
std::size_t h = 0;
std::memcpy(&h, buf.data(), sizeof(std::size_t));
return h;
}
};

View File

@ -20,7 +20,7 @@ namespace llarp
bool
Contains(const Val_t& v) const
{
return m_Values.find(v) != m_Values.end();
return m_Values.count(v) != 0;
}
/// return true if inserted
@ -30,7 +30,7 @@ namespace llarp
{
if (now == 0s)
now = llarp::time_now_ms();
return m_Values.emplace(v, now).second;
return m_Values.try_emplace(v, now).second;
}
/// decay hashset entries
@ -39,15 +39,7 @@ namespace llarp
{
if (now == 0s)
now = llarp::time_now_ms();
auto itr = m_Values.begin();
while (itr != m_Values.end())
{
if ((m_CacheInterval + itr->second) <= now)
itr = m_Values.erase(itr);
else
++itr;
}
EraseIf([&](const auto& item) { return (m_CacheInterval + item.second) <= now; });
}
Time_t
@ -56,6 +48,12 @@ namespace llarp
return m_CacheInterval;
}
bool
Empty() const
{
return m_Values.empty();
}
void
DecayInterval(Time_t interval)
{
@ -63,6 +61,23 @@ namespace llarp
}
private:
template <typename Predicate_t>
void
EraseIf(Predicate_t pred)
{
for (auto i = m_Values.begin(), last = m_Values.end(); i != last;)
{
if (pred(*i))
{
i = m_Values.erase(i);
}
else
{
++i;
}
}
}
Time_t m_CacheInterval;
std::unordered_map<Val_t, Time_t, Hash_t> m_Values;
};

View File

@ -80,13 +80,13 @@ namespace llarp
}
std::vector<std::string_view>
split(std::string_view str, char delimiter)
split(const std::string_view str, char delimiter)
{
std::vector<std::string_view> splits;
const auto str_size = str.size();
size_t last = 0;
size_t next = 0;
while (last < str.size() and next < std::string_view::npos)
while (last < str_size and next < std::string_view::npos)
{
next = str.find_first_of(delimiter, last);
if (next > last)
@ -96,7 +96,7 @@ namespace llarp
last = next;
// advance to next non-delimiter
while (str[last] == delimiter)
while (str[last] == delimiter and last < str_size)
last++;
}
else

View File

@ -42,7 +42,7 @@ namespace llarp
/// @param delimiter is the character to split on
/// @return a vector of std::string_views with the split words, excluding the delimeter
std::vector<std::string_view>
split(std::string_view str, char delimiter);
split(const std::string_view str, char delimiter);
} // namespace llarp

View File

@ -6,92 +6,40 @@
namespace llarp
{
Logic::Logic(size_t sz) : m_Thread(llarp_init_threadpool(1, "llarp-logic", sz))
{
llarp_threadpool_start(m_Thread);
/// set thread id
std::promise<ID_t> result;
// queue setting id and try to get the result back
llarp_threadpool_queue_job(m_Thread, [&]() {
m_ID = std::this_thread::get_id();
result.set_value(*m_ID);
});
// get the result back
ID_t spawned = result.get_future().get();
LogDebug("logic thread spawned on ", spawned);
}
Logic::~Logic()
{
delete m_Thread;
}
size_t
Logic::numPendingJobs() const
{
return m_Thread->pendingJobs();
}
bool
Logic::queue_job(struct llarp_thread_job job)
{
return job.user && job.work && LogicCall(this, std::bind(job.work, job.user));
if (job.user && job.work)
{
LogicCall(this, std::bind(job.work, job.user));
return true;
}
return false;
}
void
Logic::stop()
{
llarp::LogDebug("logic thread stop");
// stop all operations on threadpool
llarp_threadpool_stop(m_Thread);
}
bool
Logic::_traceLogicCall(std::function<void(void)> func, const char* tag, int line)
void
Logic::Call(std::function<void(void)> func)
{
// wrap the function so that we ensure that it's always calling stuff one at
// a time
auto f = [self = this, func]() {
if (self->m_Queue)
{
func();
}
else
{
self->m_Killer.TryAccess(func);
}
};
if (can_flush())
{
f();
return true;
func();
}
if (m_Queue)
else
{
m_Queue(f);
return true;
m_Queue(std::move(func));
}
if (m_Thread->LooksFull(5))
{
LogErrorExplicit(
tag ? tag : LOG_TAG,
line ? line : __LINE__,
"holy crap, we are trying to queue a job "
"onto the logic thread but it looks full");
std::abort();
}
auto ret = llarp_threadpool_queue_job(m_Thread, f);
if (not ret)
{
}
return ret;
}
void
Logic::SetQueuer(std::function<void(std::function<void(void)>)> q)
{
m_Queue = q;
m_Queue = std::move(q);
m_Queue([self = this]() { self->m_ID = std::this_thread::get_id(); });
}

View File

@ -11,10 +11,6 @@ namespace llarp
class Logic
{
public:
Logic(size_t queueLength = size_t{1024 * 8});
~Logic();
/// stop all operation and wait for that to die
void
stop();
@ -22,8 +18,8 @@ namespace llarp
bool
queue_job(struct llarp_thread_job job);
bool
_traceLogicCall(std::function<void(void)> func, const char* filename, int lineo);
void
Call(std::function<void(void)> func);
uint32_t
call_later(llarp_time_t later, std::function<void(void)> func);
@ -34,9 +30,6 @@ namespace llarp
void
remove_call(uint32_t id);
size_t
numPendingJobs() const;
bool
can_flush() const;
@ -51,23 +44,19 @@ namespace llarp
private:
using ID_t = std::thread::id;
llarp_threadpool* const m_Thread;
llarp_ev_loop* m_Loop = nullptr;
std::optional<ID_t> m_ID;
util::ContentionKiller m_Killer;
std::function<void(std::function<void(void)>)> m_Queue;
};
} // namespace llarp
#ifndef LogicCall
#if defined(LOKINET_DEBUG)
#ifdef LOG_TAG
#define LogicCall(l, ...) l->_traceLogicCall(__VA_ARGS__, LOG_TAG, __LINE__)
#else
#define LogicCall(l, ...) l->_traceLogicCall(__VA_ARGS__, __FILE__, __LINE__)
#endif
#else
#define LogicCall(l, ...) l->_traceLogicCall(__VA_ARGS__, 0, 0)
#endif
#endif
/// this used to be a macro
template <typename Logic_ptr, typename Func_t>
static bool
LogicCall(const Logic_ptr& logic, Func_t func)
{
logic->Call(std::move(func));
return true;
}
#endif

View File

@ -155,6 +155,7 @@ struct LinkLayerTest : public test::LlarpTest<llarp::sodium::CryptoLibSodium>
RouterContact::Lifetime = 500ms;
netLoop = llarp_make_ev_loop();
m_logic.reset(new Logic());
netLoop->set_logic(m_logic);
Alice.Setup();
Bob.Setup();
}

View File

@ -2,11 +2,42 @@
#include <router_id.hpp>
#include <catch2/catch.hpp>
TEST_CASE("Thrash DecayingHashSet", "[decaying-hashset]")
{
static constexpr auto duration = 5s;
static constexpr auto decayInterval = 50ms;
llarp::util::DecayingHashSet<llarp::AlignedBuffer<32>> hashset(decayInterval);
const llarp_time_t started = llarp::time_now_ms();
const auto end = duration + started;
llarp_time_t nextDecay = started + decayInterval;
do
{
const auto now = llarp::time_now_ms();
for (size_t i = 0; i < 500; i++)
{
llarp::AlignedBuffer<32> rando;
rando.Randomize();
hashset.Insert(rando, now);
/// maybe reinsert to simulate filter hits
if (i % 20 == 0)
hashset.Insert(rando, now);
}
if (now >= nextDecay)
{
REQUIRE(not hashset.Empty());
hashset.Decay(now);
nextDecay += decayInterval;
}
} while (llarp::time_now_ms() <= end);
}
TEST_CASE("DecayingHashSet test decay static time", "[decaying-hashset]")
{
static constexpr auto timeout = 5s;
static constexpr auto now = 1s;
llarp::util::DecayingHashSet< llarp::RouterID > hashset(timeout);
static constexpr auto now = 1s;
llarp::util::DecayingHashSet<llarp::RouterID> hashset(timeout);
const llarp::RouterID zero;
REQUIRE(zero.IsZero());
REQUIRE(not hashset.Contains(zero));
@ -23,8 +54,8 @@ TEST_CASE("DecayingHashSet test decay static time", "[decaying-hashset]")
TEST_CASE("DecayingHashSet tset decay dynamic time", "[decaying-hashset]")
{
static constexpr llarp_time_t timeout = 5s;
const llarp_time_t now = llarp::time_now_ms();
llarp::util::DecayingHashSet< llarp::RouterID > hashset(timeout);
const llarp_time_t now = llarp::time_now_ms();
llarp::util::DecayingHashSet<llarp::RouterID> hashset(timeout);
const llarp::RouterID zero;
REQUIRE(zero.IsZero());
REQUIRE(not hashset.Contains(zero));