Merge pull request #1809 from necro-nemesis/centos/8

Bump to v0.9.8
This commit is contained in:
Jason Rhinelander 2021-11-17 19:43:47 -04:00 committed by GitHub
commit 5303800846
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
86 changed files with 876 additions and 1439 deletions

View File

@ -22,7 +22,7 @@ endif()
project(lokinet
VERSION 0.9.7
VERSION 0.9.8
DESCRIPTION "lokinet - IP packet onion router"
LANGUAGES ${LANGS})

View File

@ -1,5 +1,5 @@
Name: lokinet
Version: 0.9.7
Version: 0.9.8
Release: 1%{?dist}
Summary: Lokinet anonymous, decentralized overlay network
@ -157,6 +157,9 @@ systemctl enable --now lokinet
%systemd_postun lokinet.service
%changelog
* Wed Nov 17 2021 Technical Tumbleweed <necro_nemesis@hotmail.com> - 0.9.8-1
- bump version
* Wed Oct 20 2021 Technical Tumbleweed <necro_nemesis@hotmail.com> - 0.9.7-1
- bump version

View File

@ -19,10 +19,10 @@ set(EXPAT_SOURCE expat-${EXPAT_VERSION}.tar.xz)
set(EXPAT_HASH SHA512=dde8a9a094b18d795a0e86ca4aa68488b352dc67019e0d669e8b910ed149628de4c2a49bc3a5b832f624319336a01f9e4debe03433a43e1c420f36356d886820
CACHE STRING "expat source hash")
set(UNBOUND_VERSION 1.13.1 CACHE STRING "unbound version")
set(UNBOUND_VERSION 1.13.2 CACHE STRING "unbound version")
set(UNBOUND_MIRROR ${LOCAL_MIRROR} https://nlnetlabs.nl/downloads/unbound CACHE STRING "unbound download mirror(s)")
set(UNBOUND_SOURCE unbound-${UNBOUND_VERSION}.tar.gz)
set(UNBOUND_HASH SHA256=8504d97b8fc5bd897345c95d116e0ee0ddf8c8ff99590ab2b4bd13278c9f50b8
set(UNBOUND_HASH SHA256=0a13b547f3b92a026b5ebd0423f54c991e5718037fd9f72445817f6a040e1a83
CACHE STRING "unbound source hash")
set(SQLITE3_VERSION 3350500 CACHE STRING "sqlite3 version")

View File

@ -1,6 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/debian:bullseye
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q autoremove -y

View File

@ -1,6 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/debian:buster
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q autoremove -y

View File

@ -1,6 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/debian:sid
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q autoremove -y

View File

@ -1,6 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/debian:stable
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q autoremove -y

View File

@ -1,6 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/debian:testing
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q autoremove -y

View File

@ -1,39 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-bullseye-base/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,40 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-buster-base/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev \
xz-utils

View File

@ -1,39 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-sid-base/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,39 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-stable-base/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,31 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/ubuntu:bionic
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q --no-install-recommends install -y \
autoconf \
automake \
ccache \
cmake \
eatmydata \
file \
g++-8 \
gdb \
git \
gperf \
libjemalloc-dev \
libpgm-dev \
libtool \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
ninja-build \
openssh-client \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev \
&& mkdir -p /usr/lib/x86_64-linux-gnu/pgm-5.2/include

View File

@ -1,42 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/ubuntu:focal
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,42 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/ubuntu:impish
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,42 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/ubuntu:latest
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,42 +0,0 @@
ARG ARCH=amd64
FROM ${ARCH}/ubuntu:rolling
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
python3-dev \
qttools5-dev

View File

@ -1,10 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-bullseye/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
ccache \
debhelper \
devscripts \
equivs \
git \
git-buildpackage \
python3-dev

View File

@ -1,10 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-sid/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
ccache \
debhelper \
devscripts \
equivs \
git \
git-buildpackage \
python3-dev

View File

@ -1,10 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-ubuntu-focal/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
ccache \
debhelper \
devscripts \
equivs \
git \
git-buildpackage \
python3-dev

View File

@ -1,10 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-ubuntu-impish/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
ccache \
debhelper \
devscripts \
equivs \
git \
git-buildpackage \
python3-dev

View File

@ -1,23 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-testing-base/${ARCH}
RUN /bin/bash -c 'sed -i "s/main/main contrib/g" /etc/apt/sources.list'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
android-sdk \
automake \
ccache \
cmake \
curl \
git \
google-android-ndk-installer \
libtool \
make \
openssh-client \
patch \
pkg-config \
wget \
xz-utils \
zip \
&& git clone https://github.com/Shadowstyler/android-sdk-licenses.git /tmp/android-sdk-licenses \
&& cp -a /tmp/android-sdk-licenses/*-license /usr/lib/android-sdk/licenses \
&& rm -rf /tmp/android-sdk-licenses

View File

@ -1 +0,0 @@
../00-debian-buster-base.dockerfile

View File

@ -1 +0,0 @@
../00-debian-stable-base.dockerfile

View File

@ -1 +0,0 @@
../10-debian-buster.dockerfile

View File

@ -1 +0,0 @@
../10-debian-stable.dockerfile

View File

@ -1 +0,0 @@
../00-debian-sid-base.dockerfile

View File

@ -1 +0,0 @@
../10-debian-sid.dockerfile

View File

@ -1 +0,0 @@
../40-debian-sid-debhelper.dockerfile

View File

@ -1,7 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-sid/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
clang-13 \
libc++-13-dev \
libc++abi-13-dev \
lld-13

View File

@ -1,24 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-testing-base/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
autoconf \
automake \
build-essential \
ccache \
cmake \
eatmydata \
file \
g++-mingw-w64-x86-64-posix \
git \
gperf \
libtool \
make \
ninja-build \
nsis \
openssh-client \
patch \
pkg-config \
qttools5-dev \
zip \
&& update-alternatives --set x86_64-w64-mingw32-gcc /usr/bin/x86_64-w64-mingw32-gcc-posix \
&& update-alternatives --set x86_64-w64-mingw32-g++ /usr/bin/x86_64-w64-mingw32-g++-posix

View File

@ -1,7 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-android/${ARCH}
RUN cd /opt \
&& curl https://storage.googleapis.com/flutter_infra_release/releases/stable/linux/flutter_linux_2.2.2-stable.tar.xz \
| tar xJv \
&& ln -s /opt/flutter/bin/flutter /usr/local/bin/ \
&& flutter precache

View File

@ -1 +0,0 @@
../00-debian-stable-base.dockerfile

View File

@ -1 +0,0 @@
../10-debian-stable.dockerfile

View File

@ -1,7 +0,0 @@
ARG ARCH=amd64
FROM registry.oxen.rocks/lokinet-ci-debian-sid-base/${ARCH}
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
clang-format-11 \
eatmydata \
git \
jsonnet

View File

@ -1,17 +0,0 @@
FROM node:14.16.1
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
make \
ninja-build \
openssh-client \
patch \
pkg-config \
wine

View File

@ -3,11 +3,7 @@
To rebuild all ci images and push them to the oxen registry server do:
$ docker login registry.oxen.rocks
$ ./rebuild-docker-images.sh
$ ./rebuild-docker-images.py
If you aren't part of the Oxen team, you'll likely need to set up your own registry and change
registry.oxen.rocks to your own domain name in order to do anything useful with this.
The docker images will be `registry.oxen.rocks/lokinet-ci-*`for each \*.dockerfile in this
directory, with the leading numeric `NN-` removed, if present (so that you can ensure proper
ordering using two-digit numeric prefixes).

View File

@ -0,0 +1,350 @@
#!/usr/bin/env python3
import subprocess
import tempfile
import optparse
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
parser = optparse.OptionParser()
parser.add_option("--no-cache", action="store_true",
help="Run `docker build` with the `--no-cache` option to ignore existing images")
parser.add_option("--parallel", "-j", type="int", default=1,
help="Run up to this many builds in parallel")
parser.add_option("--distro", type="string", default="",
help="Build only this distro; should be DISTRO-CODE or DISTRO-CODE/ARCH, "
"e.g. debian-sid/amd64")
(options, args) = parser.parse_args()
registry_base = 'registry.oxen.rocks/lokinet-ci-'
distros = [*(('debian', x) for x in ('sid', 'stable', 'testing', 'bullseye', 'buster')),
*(('ubuntu', x) for x in ('rolling', 'lts', 'impish', 'hirsute', 'focal', 'bionic'))]
if options.distro:
d = options.distro.split('-')
if len(d) != 2 or d[0] not in ('debian', 'ubuntu') or not d[1]:
print("Bad --distro value '{}'".format(options.distro), file=sys.stderr)
sys.exit(1)
distros = [(d[0], d[1].split('/')[0])]
manifests = {} # "image:latest": ["image/amd64", "image/arm64v8", ...]
manifestlock = threading.Lock()
def arches(distro):
if options.distro and '/' in options.distro:
arch = options.distro.split('/')
if arch not in ('amd64', 'i386', 'arm64v8', 'arm32v7'):
print("Bad --distro value '{}'".format(options.distro), file=sys.stderr)
sys.exit(1)
return [arch]
a = ['amd64', 'arm64v8', 'arm32v7']
if distro[0] == 'debian' or distro == ('ubuntu', 'bionic'):
a.append('i386') # i386 builds don't work on later ubuntu
return a
hacks = {
registry_base + 'ubuntu-bionic-builder': """g++-8 \
&& mkdir -p /usr/lib/x86_64-linux-gnu/pgm-5.2/include""",
}
failure = False
lineno = 0
linelock = threading.Lock()
def print_line(myline, value):
linelock.acquire()
global lineno
if sys.__stdout__.isatty():
jump = lineno - myline
print("\033[{jump}A\r\033[K{value}\033[{jump}B\r".format(jump=jump, value=value), end='')
sys.stdout.flush()
else:
print(value)
linelock.release()
def run_or_report(*args, myline):
try:
subprocess.run(
args, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf8')
except subprocess.CalledProcessError as e:
with tempfile.NamedTemporaryFile(suffix=".log", delete=False) as log:
log.write("Error running {}: {}\n\nOutput:\n\n".format(' '.join(args), e).encode())
log.write(e.output.encode())
global failure
failure = True
print_line(myline, "\033[31;1mError! See {} for details", log.name)
raise e
def build_tag(tag_base, arch, contents):
if failure:
raise ChildProcessError()
linelock.acquire()
global lineno
myline = lineno
lineno += 1
print()
linelock.release()
with tempfile.NamedTemporaryFile() as dockerfile:
dockerfile.write(contents.encode())
dockerfile.flush()
tag = '{}/{}'.format(tag_base, arch)
print_line(myline, "\033[33;1mRebuilding \033[35;1m{}\033[0m".format(tag))
run_or_report('docker', 'build', '--pull', '-f', dockerfile.name, '-t', tag,
*(('--no-cache',) if options.no_cache else ()), '.', myline=myline)
print_line(myline, "\033[33;1mPushing \033[35;1m{}\033[0m".format(tag))
run_or_report('docker', 'push', tag, myline=myline)
print_line(myline, "\033[32;1mFinished build \033[35;1m{}\033[0m".format(tag))
latest = tag_base + ':latest'
global manifests
manifestlock.acquire()
if latest in manifests:
manifests[latest].append(tag)
else:
manifests[latest] = [tag]
manifestlock.release()
def base_distro_build(distro, arch):
tag = '{r}{distro[0]}-{distro[1]}-base'.format(r=registry_base, distro=distro)
codename = 'latest' if distro == ('ubuntu', 'lts') else distro[1]
build_tag(tag, arch, """
FROM {}/{}:{}
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q autoremove -y \
{hacks}
""".format(arch, distro[0], codename, hacks=hacks.get(tag, '')))
def distro_build(distro, arch):
prefix = '{r}{distro[0]}-{distro[1]}'.format(r=registry_base, distro=distro)
fmtargs = dict(arch=arch, distro=distro, prefix=prefix)
# (distro)-(codename)-base: Base image from upstream: we sync the repos, but do nothing else.
if (distro, arch) != (('debian', 'stable'), 'amd64'): # debian-stable-base/amd64 already built
base_distro_build(distro, arch)
# (distro)-(codename)-builder: Deb builder image used for building debs; we add the basic tools
# we use to build debs, not including things that should come from the dependencies in the
# debian/control file.
build_tag(prefix + '-builder', arch, """
FROM {prefix}-base/{arch}
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
ccache \
devscripts \
equivs \
g++ \
git \
git-buildpackage \
openssh-client \
{hacks}
""".format(**fmtargs, hacks=hacks.get(prefix + '-builder', '')))
# (distro)-(codename): Basic image we use for most builds. This takes the -builder and adds
# most dependencies found in our packages.
build_tag(prefix, arch, """
FROM {prefix}-builder/{arch}
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 --no-install-recommends -q install -y \
automake \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
libboost-program-options-dev \
libboost-serialization-dev \
libboost-thread-dev \
libcurl4-openssl-dev \
libevent-dev \
libgtest-dev \
libhidapi-dev \
libjemalloc-dev \
libminiupnpc-dev \
libreadline-dev \
libsodium-dev \
libsqlite3-dev \
libssl-dev \
libsystemd-dev \
libtool \
libunbound-dev \
libunwind8-dev \
libusb-1.0.0-dev \
libuv1-dev \
libzmq3-dev \
lsb-release \
make \
nettle-dev \
ninja-build \
openssh-client \
patch \
pkg-config \
pybind11-dev \
python3-dev \
python3-pip \
python3-pybind11 \
python3-pytest \
python3-setuptools \
qttools5-dev \
{hacks}
""".format(**fmtargs, hacks=hacks.get(prefix, '')))
# Android and flutter builds on top of debian-stable-base and adds a ton of android crap; we
# schedule this job as soon as the debian-sid-base/amd64 build finishes, because they easily take
# the longest and are by far the biggest images.
def android_builds():
build_tag(registry_base + 'android', 'amd64', """
FROM {r}debian-stable-base
RUN /bin/bash -c 'sed -i "s/main/main contrib/g" /etc/apt/sources.list'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
android-sdk \
automake \
ccache \
cmake \
curl \
git \
google-android-ndk-installer \
libtool \
make \
openssh-client \
patch \
pkg-config \
wget \
xz-utils \
zip \
&& git clone https://github.com/Shadowstyler/android-sdk-licenses.git /tmp/android-sdk-licenses \
&& cp -a /tmp/android-sdk-licenses/*-license /usr/lib/android-sdk/licenses \
&& rm -rf /tmp/android-sdk-licenses
""".format(r=registry_base))
build_tag(registry_base + 'flutter', 'amd64', """
FROM {r}android
RUN cd /opt \
&& curl https://storage.googleapis.com/flutter_infra_release/releases/stable/linux/flutter_linux_2.2.2-stable.tar.xz \
| tar xJv \
&& ln -s /opt/flutter/bin/flutter /usr/local/bin/ \
&& flutter precache
""".format(r=registry_base))
# lint is a tiny build (on top of debian-stable-base) with just formatting checking tools
def lint_build():
build_tag(registry_base + 'lint', 'amd64', """
FROM {r}debian-stable-base
RUN apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
clang-format-11 \
eatmydata \
git \
jsonnet
""".format(r=registry_base))
def nodejs_build():
build_tag(registry_base + 'nodejs', 'amd64', """
FROM node:14.16.1
RUN /bin/bash -c 'echo "man-db man-db/auto-update boolean false" | debconf-set-selections'
RUN apt-get -o=Dpkg::Use-Pty=0 -q update \
&& apt-get -o=Dpkg::Use-Pty=0 -q dist-upgrade -y \
&& apt-get -o=Dpkg::Use-Pty=0 -q install --no-install-recommends -y \
ccache \
cmake \
eatmydata \
g++ \
gdb \
git \
make \
ninja-build \
openssh-client \
patch \
pkg-config \
wine
""")
# Start debian-stable-base/amd64 on its own, because other builds depend on it and we want to get
# those (especially android/flutter) fired off as soon as possible (because it's slow and huge).
if ('debian', 'stable') in distros:
base_distro_build(['debian', 'stable'], 'amd64')
executor = ThreadPoolExecutor(max_workers=max(options.parallel, 1))
if options.distro:
jobs = []
else:
jobs = [executor.submit(b) for b in (android_builds, lint_build, nodejs_build)]
for d in distros:
for a in arches(d):
jobs.append(executor.submit(distro_build, d, a))
while len(jobs):
j = jobs.pop(0)
try:
j.result()
except (ChildProcessError, subprocess.CalledProcessError):
for k in jobs:
k.cancel()
if failure:
print("Error(s) occured, aborting!", file=sys.stderr)
sys.exit(1)
print("\n\n\033[32;1mAll builds finished successfully; pushing manifests...\033[0m\n")
def push_manifest(latest, tags):
if failure:
raise ChildProcessError()
linelock.acquire()
global lineno
myline = lineno
lineno += 1
print()
linelock.release()
subprocess.run(['docker', 'manifest', 'rm', latest], stderr=subprocess.DEVNULL, check=False)
print_line(myline, "\033[33;1mCreating manifest \033[35;1m{}\033[0m".format(latest))
run_or_report('docker', 'manifest', 'create', latest, *tags, myline=myline)
print_line(myline, "\033[33;1mPushing manifest \033[35;1m{}\033[0m".format(latest))
run_or_report('docker', 'manifest', 'push', latest, myline=myline)
print_line(myline, "\033[32;1mFinished manifest \033[35;1m{}\033[0m".format(latest))
for latest, tags in manifests.items():
jobs.append(executor.submit(push_manifest, latest, tags))
while len(jobs):
j = jobs.pop(0)
try:
j.result()
except (ChildProcessError, subprocess.CalledProcessError):
for k in jobs:
k.cancel()
print("\n\n\033[32;1mAll done!\n")

View File

@ -1,44 +0,0 @@
#!/bin/bash
set -o errexit
trap 'echo -e "\n\n\n\e[31;1mAn error occurred!\e[1m\n\n"' ERR
registry=registry.oxen.rocks
if [[ $# -eq 0 ]]; then
files=(*.dockerfile i386/*.dockerfile arm64v8/*.dockerfile arm32v7/*.dockerfile)
else
files=("$@")
fi
declare -A manifests
for file in "${files[@]}"; do
if [[ "$file" == */* ]]; then
arch="${file%%/*}"
name="${file#*/}"
else
arch="amd64"
name="$file"
fi
name="${name#[0-9][0-9]-}" # s/^\d\d-//
name="${name%.dockerfile}" # s/\.dockerfile$//
namearch=$registry/lokinet-ci-$name/$arch
latest=$registry/lokinet-ci-$name:latest
echo -e "\e[32;1mrebuilding \e[35;1m$namearch\e[0m"
docker build --pull -f $file -t $namearch --build-arg ARCH=$arch .
docker push $namearch
manifests[$latest]="${manifests[$latest]} $namearch"
done
for latest in "${!manifests[@]}"; do
echo -e "\e[32;1mpushing new manifest for \e[33;1m$latest[\e[35;1m${manifests[$latest]} \e[33;1m]\e[0m"
docker manifest rm $latest 2>/dev/null || true
docker manifest create $latest ${manifests[$latest]}
docker manifest push $latest
done
echo -e "\n\n\n\e[32;1mAll done!\e[1m\n\n"

View File

@ -44,7 +44,8 @@ elif [ -e lokinet.apk ] ; then
archive="$base.apk"
cp -av lokinet.apk "$archive"
else
cp -av daemon/lokinet daemon/lokinet-vpn daemon/lokinet-bootstrap "$base"
cp -av daemon/lokinet daemon/lokinet-vpn "$base"
cp -av ../contrib/bootstrap/mainnet.signed "$base/bootstrap.signed"
# tar dat shiz up yo
archive="$base.tar.xz"
tar cJvf "$archive" "$base"

View File

@ -109,7 +109,7 @@ namespace llarp::consensus
std::shuffle(testing_queue.begin(), testing_queue.end(), rng);
// Recurse with the rebuild list, but don't let it try rebuilding again
// Recurse with the rebuilt list, but don't let it try rebuilding again
return next_random(router, now, false);
}
@ -124,7 +124,8 @@ namespace llarp::consensus
auto& [pk, retest_time, failures] = failing_queue.top();
if (retest_time > now)
break;
result.emplace_back(pk, failures);
if (failing.count(pk))
result.emplace_back(pk, failures);
failing_queue.pop();
}
return result;

View File

@ -80,7 +80,7 @@ namespace llarp
AsyncDecrypt(const EncryptedFrame& frame, User_ptr u, WorkerFunction_t worker)
{
target = frame;
worker(std::bind(&AsyncFrameDecrypter<User>::Decrypt, this, std::move(u)));
worker([this, u = std::move(u)]() mutable { Decrypt(std::move(u)); });
}
};
} // namespace llarp

View File

@ -30,14 +30,13 @@ namespace llarp
llarp::LogDebug("got ", valuesFound.size(), " routers from exploration");
auto router = parent->GetRouter();
using std::placeholders::_1;
for (const auto& pk : valuesFound)
{
// lookup router
if (router and router->nodedb()->Has(pk))
continue;
parent->LookupRouter(
pk, std::bind(&AbstractRouter::HandleDHTLookupForExplore, router, pk, _1));
pk, [router, pk](const auto& res) { router->HandleDHTLookupForExplore(pk, res); });
}
}
} // namespace dht

View File

@ -75,17 +75,11 @@ namespace llarp
virtual bool
running() const = 0;
// Returns a current steady clock time value representing the current time with event loop tick
// granularity. That is, the value is typically only updated at the beginning of an event loop
// tick.
virtual llarp_time_t
time_now() const
{
return llarp::time_now_ms();
}
// Triggers an event loop wakeup; use when something has been done that requires the event loop
// to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon().
// Idempotent and thread-safe.
virtual void
wakeup() = 0;
time_now() const = 0;
// Calls a function/lambda/etc. If invoked from within the event loop itself this calls the
// given lambda immediately; otherwise it passes it to `call_soon()` to be queued to run at the
@ -196,10 +190,6 @@ namespace llarp
virtual std::shared_ptr<UDPHandle>
make_udp(UDPReceiveFunc on_recv) = 0;
/// set the function that is called once per cycle the flush all the queues
virtual void
set_pump_function(std::function<void(void)> pumpll) = 0;
/// Make a thread-safe event loop waker (an "async" in libuv terminology) on this event loop;
/// you can call `->Trigger()` on the returned shared pointer to fire the callback at the next
/// available event loop iteration. (Multiple Trigger calls invoked before the call is actually
@ -227,6 +217,13 @@ namespace llarp
{
return nullptr;
}
protected:
// Triggers an event loop wakeup; use when something has been done that requires the event loop
// to wake up (e.g. adding to queues). This is called implicitly by call() and call_soon().
// Idempotent and thread-safe.
virtual void
wakeup() = 0;
};
using EventLoop_ptr = std::shared_ptr<EventLoop>;

View File

@ -111,13 +111,12 @@ namespace llarp::uv
{
llarp::LogTrace("ticking event loop.");
FlushLogic();
PumpLL();
auto& log = llarp::LogContext::Instance();
if (log.logStream)
log.logStream->Tick(time_now());
}
Loop::Loop(size_t queue_size) : llarp::EventLoop{}, PumpLL{[] {}}, m_LogicCalls{queue_size}
Loop::Loop(size_t queue_size) : llarp::EventLoop{}, m_LogicCalls{queue_size}
{
if (!(m_Impl = uvw::Loop::create()))
throw std::runtime_error{"Failed to construct libuv loop"};
@ -161,12 +160,6 @@ namespace llarp::uv
m_WakeUp->send();
}
void
Loop::set_pump_function(std::function<void(void)> pump)
{
PumpLL = std::move(pump);
}
std::shared_ptr<llarp::UDPHandle>
Loop::make_udp(UDPReceiveFunc on_recv)
{

View File

@ -31,8 +31,11 @@ namespace llarp::uv
bool
running() const override;
void
wakeup() override;
llarp_time_t
time_now() const override
{
return m_Impl->now();
}
void
call_later(llarp_time_t delay_ms, std::function<void(void)> callback) override;
@ -54,9 +57,6 @@ namespace llarp::uv
void
call_soon(std::function<void(void)> f) override;
void
set_pump_function(std::function<void(void)> pumpll) override;
std::shared_ptr<llarp::EventLoopWakeup>
make_waker(std::function<void()> callback) override;
@ -69,8 +69,6 @@ namespace llarp::uv
void
FlushLogic();
std::function<void(void)> PumpLL;
std::shared_ptr<uvw::Loop>
MaybeGetUVWLoop() override;
@ -95,6 +93,9 @@ namespace llarp::uv
std::unordered_map<int, std::shared_ptr<uvw::PollHandle>> m_Polls;
std::optional<std::thread::id> m_EventLoopThreadID;
void
wakeup() override;
};
} // namespace llarp::uv

View File

@ -137,6 +137,9 @@ namespace llarp
void
BaseSession::CallPendingCallbacks(bool success)
{
if (m_PendingCallbacks.empty())
return;
if (success)
{
auto self = shared_from_this();
@ -288,9 +291,8 @@ namespace llarp
auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
if (path)
{
for (auto& item : m_Upstream)
for (auto& [i, queue] : m_Upstream)
{
auto& queue = item.second;
while (queue.size())
{
auto& msg = queue.front();
@ -305,8 +307,8 @@ namespace llarp
if (m_Upstream.size())
llarp::LogWarn("no path for exit session");
// discard upstream
for (auto& item : m_Upstream)
item.second.clear();
for (auto& [i, queue] : m_Upstream)
queue.clear();
m_Upstream.clear();
if (numHops == 1)
{

View File

@ -92,8 +92,7 @@ namespace llarp
}
return std::nullopt;
}
else
return std::nullopt;
return std::nullopt;
}
const EventLoop_ptr&
@ -112,16 +111,13 @@ namespace llarp
return false;
if (auto* rid = std::get_if<RouterID>(&*maybeAddr))
{
auto range = m_ActiveExits.equal_range(PubKey{*rid});
auto itr = range.first;
while (itr != range.second)
for (auto [itr, end] = m_ActiveExits.equal_range(PubKey{*rid}); itr != end; ++itr)
{
if (not itr->second->LooksDead(Now()))
{
if (itr->second->QueueInboundTraffic(ManagedBuffer{payload}, type))
return true;
}
++itr;
}
if (not m_Router->PathToRouterAllowed(*rid))
@ -136,8 +132,7 @@ namespace llarp
}
return true;
}
else
return false;
return false;
}
bool
@ -357,13 +352,9 @@ namespace llarp
ExitEndpoint::VisitEndpointsFor(
const PubKey& pk, std::function<bool(exit::Endpoint* const)> visit) const
{
auto range = m_ActiveExits.equal_range(pk);
auto itr = range.first;
while (itr != range.second)
for (auto [itr, end] = m_ActiveExits.equal_range(pk); itr != end; ++itr)
{
if (visit(itr->second.get()))
++itr;
else
if (not visit(itr->second.get()))
return true;
}
return false;
@ -422,27 +413,18 @@ namespace llarp
" as we have no working endpoints");
}
});
for (auto& [pubkey, endpoint] : m_ActiveExits)
{
auto itr = m_ActiveExits.begin();
while (itr != m_ActiveExits.end())
if (!endpoint->Flush())
{
if (!itr->second->Flush())
{
LogWarn("exit session with ", itr->first, " dropped packets");
}
++itr;
LogWarn("exit session with ", pubkey, " dropped packets");
}
}
for (auto& [id, session] : m_SNodeSessions)
{
auto itr = m_SNodeSessions.begin();
while (itr != m_SNodeSessions.end())
{
itr->second->FlushUpstream();
itr->second->FlushDownstream();
++itr;
}
session->FlushUpstream();
session->FlushDownstream();
}
m_Router->PumpLL();
}
bool
@ -558,15 +540,13 @@ namespace llarp
// find oldest activity ip address
huint128_t found = {0};
llarp_time_t min = std::numeric_limits<llarp_time_t>::max();
auto itr = m_IPActivity.begin();
while (itr != m_IPActivity.end())
for (const auto& [addr, time] : m_IPActivity)
{
if (itr->second < min)
if (time < min)
{
found.h = itr->first.h;
min = itr->second;
found.h = addr.h;
min = time;
}
++itr;
}
// kick old ident off exit
// TODO: DoS
@ -620,9 +600,9 @@ namespace llarp
ExitEndpoint::AllRemoteEndpoints() const
{
std::unordered_set<AddressVariant_t> remote;
for (auto itr = m_Paths.begin(); itr != m_Paths.end(); ++itr)
for (const auto& [path, pubkey] : m_Paths)
{
remote.insert(RouterID{itr->second});
remote.insert(RouterID{pubkey});
}
return remote;
}
@ -640,9 +620,7 @@ namespace llarp
huint128_t ip = m_KeyToIP[pk];
m_KeyToIP.erase(pk);
m_IPToKey.erase(ip);
auto range = m_ActiveExits.equal_range(pk);
auto exit_itr = range.first;
while (exit_itr != range.second)
for (auto [exit_itr, end] = m_ActiveExits.equal_range(pk); exit_itr != end;)
exit_itr = m_ActiveExits.erase(exit_itr);
}
@ -677,19 +655,14 @@ namespace llarp
{
exit::Endpoint* endpoint = nullptr;
PubKey pk;
{
auto itr = m_Paths.find(path);
if (itr == m_Paths.end())
return nullptr;
if (auto itr = m_Paths.find(path); itr != m_Paths.end())
pk = itr->second;
}
else
return nullptr;
if (auto itr = m_ActiveExits.find(pk); itr != m_ActiveExits.end())
{
auto itr = m_ActiveExits.find(pk);
if (itr != m_ActiveExits.end())
{
if (itr->second->PubKey() == pk)
endpoint = itr->second.get();
}
if (itr->second->PubKey() == pk)
endpoint = itr->second.get();
}
return endpoint;
}
@ -698,8 +671,7 @@ namespace llarp
ExitEndpoint::UpdateEndpointPath(const PubKey& remote, const PathID_t& next)
{
// check if already mapped
auto itr = m_Paths.find(next);
if (itr != m_Paths.end())
if (auto itr = m_Paths.find(next); itr != m_Paths.end())
return false;
m_Paths.emplace(next, remote);
return true;
@ -780,7 +752,7 @@ namespace llarp
{
auto session = std::make_shared<exit::SNodeSession>(
other,
std::bind(&ExitEndpoint::QueueSNodePacket, this, std::placeholders::_1, ip),
[this, ip](const auto& buf) { return QueueSNodePacket(buf, ip); },
GetRouter(),
2,
1,
@ -837,18 +809,14 @@ namespace llarp
void
ExitEndpoint::RemoveExit(const exit::Endpoint* ep)
{
auto range = m_ActiveExits.equal_range(ep->PubKey());
auto itr = range.first;
while (itr != range.second)
for (auto [itr, end] = m_ActiveExits.equal_range(ep->PubKey()); itr != end; ++itr)
{
if (itr->second->GetCurrentPath() == ep->GetCurrentPath())
{
itr = m_ActiveExits.erase(itr);
m_ActiveExits.erase(itr);
// now ep is gone af
return;
}
++itr;
}
}

View File

@ -103,12 +103,10 @@ namespace llarp
void
CalculateTrafficStats(Stats& stats)
{
auto itr = m_ActiveExits.begin();
while (itr != m_ActiveExits.end())
for (auto& [pubkey, endpoint] : m_ActiveExits)
{
stats[itr->first].first += itr->second->TxRate();
stats[itr->first].second += itr->second->RxRate();
++itr;
stats[pubkey].first += endpoint->TxRate();
stats[pubkey].second += endpoint->RxRate();
}
}

View File

@ -86,13 +86,7 @@ namespace llarp
TunEndpoint::TunEndpoint(AbstractRouter* r, service::Context* parent)
: service::Endpoint(r, parent)
, m_UserToNetworkPktQueue("endpoint_sendq", r->loop(), r->loop())
{
m_PacketSendWaker = r->loop()->make_waker([this]() { FlushWrite(); });
m_MessageSendWaker = r->loop()->make_waker([this]() {
FlushSend();
Pump(Now());
});
m_PacketRouter = std::make_unique<vpn::PacketRouter>(
[this](net::IPPacket pkt) { HandleGotUserPacket(std::move(pkt)); });
#if defined(ANDROID) || defined(__APPLE__)
@ -362,15 +356,7 @@ namespace llarp
}
void
TunEndpoint::Flush()
{
FlushSend();
Pump(Now());
FlushWrite();
}
void
TunEndpoint::FlushWrite()
TunEndpoint::Pump(llarp_time_t now)
{
// flush network to user
while (not m_NetworkToUserPktQueue.empty())
@ -378,6 +364,8 @@ namespace llarp
m_NetIf->WritePacket(m_NetworkToUserPktQueue.top().pkt);
m_NetworkToUserPktQueue.pop();
}
service::Endpoint::Pump(now);
}
static bool
@ -952,7 +940,6 @@ namespace llarp
LogInfo(Name(), " has ipv6 address ", m_OurIPv6);
}
#endif
Router()->loop()->add_ticker([this] { Flush(); });
// Attempt to register DNS on the interface
systemd_resolved_set_dns(
@ -1037,151 +1024,146 @@ namespace llarp
}
void
TunEndpoint::FlushSend()
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Process([&](net::IPPacket& pkt) {
huint128_t dst, src;
if (pkt.IsV4())
huint128_t dst, src;
if (pkt.IsV4())
{
dst = pkt.dst4to6();
src = pkt.src4to6();
}
else
{
dst = pkt.dstv6();
src = pkt.srcv6();
}
// this is for ipv6 slaac on ipv6 exits
/*
constexpr huint128_t ipv6_multicast_all_nodes =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}};
constexpr huint128_t ipv6_multicast_all_routers =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}};
if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled)
{
// send ipv6 multicast
for (const auto& [ip, addr] : m_IPToAddr)
{
dst = pkt.dst4to6();
src = pkt.src4to6();
(void)ip;
SendToOrQueue(
service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit);
}
else
return;
}
*/
if (m_state->m_ExitEnabled)
{
dst = net::ExpandV4(net::TruncateV6(dst));
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
{
// find all ranges that match the destination ip
const auto exitEntries = m_ExitMap.FindAllEntries(dst);
if (exitEntries.empty())
{
dst = pkt.dstv6();
src = pkt.srcv6();
}
// this is for ipv6 slaac on ipv6 exits
/*
constexpr huint128_t ipv6_multicast_all_nodes =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 1UL}};
constexpr huint128_t ipv6_multicast_all_routers =
huint128_t{uint128_t{0xff01'0000'0000'0000UL, 2UL}};
if (dst == ipv6_multicast_all_nodes and m_state->m_ExitEnabled)
{
// send ipv6 multicast
for (const auto& [ip, addr] : m_IPToAddr)
// send icmp unreachable as we dont have any exits for this ip
if (const auto icmp = pkt.MakeICMPUnreachable())
{
(void)ip;
SendToOrQueue(
service::Address{addr.as_array()}, pkt.ConstBuffer(), service::ProtocolType::Exit);
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
}
return;
}
*/
if (m_state->m_ExitEnabled)
service::Address addr{};
for (const auto& [range, exitAddr] : exitEntries)
{
dst = net::ExpandV4(net::TruncateV6(dst));
if (not IsBogon(dst) or range.BogonContains(dst))
{
addr = exitAddr;
}
// we do not permit bogons when they don't explicitly match a permitted bogon range
}
auto itr = m_IPToAddr.find(dst);
if (itr == m_IPToAddr.end())
{
// find all ranges that match the destination ip
const auto exitEntries = m_ExitMap.FindAllEntries(dst);
if (exitEntries.empty())
{
// send icmp unreachable as we dont have any exits for this ip
if (const auto icmp = pkt.MakeICMPUnreachable())
{
HandleWriteIPPacket(icmp->ConstBuffer(), dst, src, 0);
}
return;
}
service::Address addr{};
for (const auto& [range, exitAddr] : exitEntries)
{
if (range.BogonRange() and range.Contains(dst))
{
// we permit this because it matches our rules and we allow bogons
addr = exitAddr;
}
else if (not IsBogon(dst))
{
// allow because the destination is not a bogon and the mapped range is not a bogon
addr = exitAddr;
}
// we do not permit bogons when they don't explicitly match a permitted bogon range
}
if (addr.IsZero()) // drop becase no exit was found that matches our rules
return;
pkt.ZeroSourceAddress();
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[pkt](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
return;
}
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
if (addr.IsZero()) // drop becase no exit was found that matches our rules
return;
}
std::variant<service::Address, RouterID> to;
service::ProtocolType type;
if (m_SNodes.at(itr->second))
{
to = RouterID{itr->second.as_array()};
type = service::ProtocolType::TrafficV4;
}
else
{
to = service::Address{itr->second.as_array()};
type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit
: pkt.ServiceProtocol();
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
// this does not happen for exits because the point is they don't rewrite addresses
if (type != service::ProtocolType::Exit)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (auto maybe = GetBestConvoTagFor(to))
{
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
return;
}
}
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
pkt.ZeroSourceAddress();
MarkAddressOutbound(addr);
EnsurePathToService(
addr,
[pkt, this](service::Address addr, service::OutboundContext* ctx) {
if (ctx)
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
}
else
{
var::visit(
[&](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
ctx->SendPacketToRemote(pkt.ConstBuffer(), service::ProtocolType::Exit);
Router()->TriggerPump();
return;
}
LogWarn("cannot ensure path to exit ", addr, " so we drop some packets");
},
PathAlignmentTimeout());
});
return;
}
std::variant<service::Address, RouterID> to;
service::ProtocolType type;
if (m_SNodes.at(itr->second))
{
to = RouterID{itr->second.as_array()};
type = service::ProtocolType::TrafficV4;
}
else
{
to = service::Address{itr->second.as_array()};
type = m_state->m_ExitEnabled and src != m_OurIP ? service::ProtocolType::Exit
: pkt.ServiceProtocol();
}
// prepare packet for insertion into network
// this includes clearing IP addresses, recalculating checksums, etc
// this does not happen for exits because the point is they don't rewrite addresses
if (type != service::ProtocolType::Exit)
{
if (pkt.IsV4())
pkt.UpdateIPv4Address({0}, {0});
else
pkt.UpdateIPv6Address({0}, {0});
}
// try sending it on an existing convotag
// this succeds for inbound convos, probably.
if (auto maybe = GetBestConvoTagFor(to))
{
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
Router()->TriggerPump();
return;
}
}
// try establishing a path to this guy
// will fail if it's an inbound convo
EnsurePathTo(
to,
[pkt, type, dst, to, this](auto maybe) {
if (not maybe)
{
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to ensure path to ", addr, " no convo tag found");
},
to);
}
if (SendToOrQueue(*maybe, pkt.ConstBuffer(), type))
{
MarkIPActive(dst);
Router()->TriggerPump();
}
else
{
var::visit(
[this](auto&& addr) {
LogWarn(Name(), " failed to send to ", addr, ", SendToOrQueue failed");
},
to);
}
},
PathAlignmentTimeout());
}
bool
@ -1288,7 +1270,7 @@ namespace llarp
bool allow = false;
for (const auto& [range, exitAddr] : mapped)
{
if ((range.BogonRange() and range.Contains(src)) or not IsBogon(src))
if (not IsBogon(src) or range.BogonContains(src))
{
// allow if this address matches the endpoint we think it should be
allow = exitAddr == fromAddr;
@ -1335,8 +1317,8 @@ namespace llarp
pkt.UpdateIPv6Address(src, dst);
}
m_NetworkToUserPktQueue.push(std::move(write));
// wake up packet flushing event so we ensure that all packets are written to user
m_PacketSendWaker->Trigger();
// wake up so we ensure that all packets are written to user
Router()->TriggerPump();
return true;
}
@ -1441,13 +1423,6 @@ namespace llarp
m_IPActivity[ip] = std::numeric_limits<llarp_time_t>::max();
}
void
TunEndpoint::HandleGotUserPacket(net::IPPacket pkt)
{
m_UserToNetworkPktQueue.Emplace(std::move(pkt));
m_MessageSendWaker->Trigger();
}
TunEndpoint::~TunEndpoint() = default;
} // namespace handlers

View File

@ -112,10 +112,6 @@ namespace llarp
HandleWriteIPPacket(
const llarp_buffer_t& buf, huint128_t src, huint128_t dst, uint64_t seqno);
/// queue outbound packet to the world
bool
QueueOutboundTraffic(llarp::net::IPPacket&& pkt);
/// we got a packet from the user
void
HandleGotUserPacket(llarp::net::IPPacket pkt);
@ -172,10 +168,6 @@ namespace llarp
huint128_t
ObtainIPForAddr(std::variant<service::Address, RouterID> addr) override;
/// flush network traffic
void
Flush();
void
ResetInternalState() override;
@ -187,9 +179,6 @@ namespace llarp
net::IPPacket::CompareOrder,
net::IPPacket::GetNow>;
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;
struct WritePacket
{
uint64_t seqno;
@ -204,6 +193,10 @@ namespace llarp
/// queue for sending packets to user from network
std::priority_queue<WritePacket> m_NetworkToUserPktQueue;
void
Pump(llarp_time_t now) override;
/// return true if we have a remote loki address for this ip address
bool
HasRemoteForIP(huint128_t ipv4) const;
@ -216,10 +209,6 @@ namespace llarp
void
MarkIPActiveForever(huint128_t ip);
/// flush ip packets
virtual void
FlushSend();
/// flush writing ip packets to interface
void
FlushWrite();
@ -292,11 +281,6 @@ namespace llarp
std::set<IPRange> m_OwnedRanges;
/// how long to wait for path alignment
llarp_time_t m_PathAlignmentTimeout;
/// idempotent wakeup for writing packets to user
std::shared_ptr<EventLoopWakeup> m_PacketSendWaker;
/// idempotent wakeup for writing messages to network
std::shared_ptr<EventLoopWakeup> m_MessageSendWaker;
/// a file to load / store the ephemeral address map to
std::optional<fs::path> m_PersistAddrMapFile;

View File

@ -136,7 +136,7 @@ namespace llarp
{
auto job = std::make_shared<ExecShellHookJob>(shared_from_this(), std::move(params));
m_ThreadPool.addJob(std::bind(&ExecShellHookJob::Exec, job));
m_ThreadPool.addJob([job = std::move(job)] { job->Exec(); });
}
Backend_ptr

View File

@ -23,9 +23,7 @@ namespace llarp::iwp
: ILinkLayer(
keyManager, getrc, h, sign, before, est, reneg, timeout, closed, pumpDone, worker)
, m_Wakeup{ev->make_waker([this]() { HandleWakeupPlaintext(); })}
, m_PlaintextRecv{1024}
, m_Inbound{allowInbound}
{}
const char*
@ -58,14 +56,15 @@ namespace llarp::iwp
if (itr == m_AuthedAddrs.end())
{
Lock_t lock{m_PendingMutex};
if (m_Pending.count(from) == 0)
auto it = m_Pending.find(from);
if (it == m_Pending.end())
{
if (not m_Inbound)
return;
isNewSession = true;
m_Pending.insert({from, std::make_shared<Session>(this, from)});
it = m_Pending.emplace(from, std::make_shared<Session>(this, from)).first;
}
session = m_Pending.find(from)->second;
session = it->second;
}
else
{
@ -78,8 +77,9 @@ namespace llarp::iwp
if (not success and isNewSession)
{
LogWarn("Brand new session failed; removing from pending sessions list");
m_Pending.erase(m_Pending.find(from));
m_Pending.erase(from);
}
WakeupPlaintext();
}
}
@ -106,13 +106,6 @@ namespace llarp::iwp
return std::make_shared<Session>(this, rc, ai);
}
void
LinkLayer::AddWakeup(std::weak_ptr<Session> session)
{
if (auto ptr = session.lock())
m_PlaintextRecv[ptr->GetRemoteEndpoint()] = session;
}
void
LinkLayer::WakeupPlaintext()
{
@ -122,13 +115,15 @@ namespace llarp::iwp
void
LinkLayer::HandleWakeupPlaintext()
{
for (const auto& [addr, session] : m_PlaintextRecv)
{
auto ptr = session.lock();
if (ptr)
ptr->HandlePlaintext();
}
m_PlaintextRecv.clear();
// Copy bare pointers out first because HandlePlaintext can end up removing themselves from the
// structures.
m_WakingUp.clear(); // Reused to minimize allocations.
for (const auto& [router_id, session] : m_AuthedLinks)
m_WakingUp.push_back(session.get());
for (const auto& [addr, session] : m_Pending)
m_WakingUp.push_back(session.get());
for (auto* session : m_WakingUp)
session->HandlePlaintext();
PumpDone();
}

View File

@ -53,9 +53,6 @@ namespace llarp::iwp
void
WakeupPlaintext();
void
AddWakeup(std::weak_ptr<Session> peer);
std::string
PrintableName() const;
@ -64,8 +61,8 @@ namespace llarp::iwp
HandleWakeupPlaintext();
const std::shared_ptr<EventLoopWakeup> m_Wakeup;
std::unordered_map<SockAddr, std::weak_ptr<Session>> m_PlaintextRecv;
std::unordered_map<SockAddr, RouterID> m_AuthedAddrs;
std::vector<ILinkSession*> m_WakingUp;
const bool m_Inbound;
};

View File

@ -3,6 +3,7 @@
#include <llarp/messages/link_intro.hpp>
#include <llarp/messages/discard.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/router/abstractrouter.hpp>
namespace llarp
{
@ -26,7 +27,7 @@ namespace llarp
return pkt;
}
constexpr size_t PlaintextQueueSize = 32;
constexpr size_t PlaintextQueueSize = 512;
Session::Session(LinkLayer* p, const RouterContact& rc, const AddressInfo& ai)
: m_State{State::Initial}
@ -39,6 +40,7 @@ namespace llarp
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Zero();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotOutboundLIM, this);
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(rc.pubkey));
}
@ -52,6 +54,7 @@ namespace llarp
, m_PlaintextRecv{PlaintextQueueSize}
{
token.Randomize();
m_PlaintextEmpty.test_and_set();
GotLIM = util::memFn(&Session::GotInboundLIM, this);
const PubKey pk = m_Parent->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(m_SessionKey, llarp_buffer_t(pk));
@ -94,9 +97,8 @@ namespace llarp
m_RemoteRC = msg->rc;
GotLIM = util::memFn(&Session::GotRenegLIM, this);
auto self = shared_from_this();
assert(self.use_count() > 1);
SendOurLIM([self](ILinkSession::DeliveryStatus st) {
assert(shared_from_this().use_count() > 1);
SendOurLIM([self = shared_from_this()](ILinkSession::DeliveryStatus st) {
if (st == ILinkSession::DeliveryStatus::eDeliverySuccess)
{
self->m_State = State::Ready;
@ -136,6 +138,7 @@ namespace llarp
Session::EncryptAndSend(ILinkSession::Packet_t data)
{
m_EncryptNext.emplace_back(std::move(data));
TriggerPump();
if (!IsEstablished())
{
EncryptWorker(std::move(m_EncryptNext));
@ -190,6 +193,7 @@ namespace llarp
const auto bufsz = buf.size();
auto& msg = m_TXMsgs.emplace(msgid, OutboundMessage{msgid, std::move(buf), now, completed})
.first->second;
TriggerPump();
EncryptAndSend(msg.XMIT());
if (bufsz > FragmentSize)
{
@ -225,6 +229,12 @@ namespace llarp
}
}
void
Session::TriggerPump()
{
m_Parent->Router()->TriggerPump();
}
void
Session::Pump()
{
@ -233,33 +243,33 @@ namespace llarp
{
if (ShouldPing())
SendKeepAlive();
for (auto& item : m_RXMsgs)
for (auto& [id, msg] : m_RXMsgs)
{
if (item.second.ShouldSendACKS(now))
if (msg.ShouldSendACKS(now))
{
item.second.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
msg.SendACKS(util::memFn(&Session::EncryptAndSend, this), now);
}
}
for (auto& item : m_TXMsgs)
for (auto& [id, msg] : m_TXMsgs)
{
if (item.second.ShouldFlush(now))
if (msg.ShouldFlush(now))
{
item.second.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
msg.FlushUnAcked(util::memFn(&Session::EncryptAndSend, this), now);
}
}
}
auto self = shared_from_this();
assert(self.use_count() > 1);
assert(shared_from_this().use_count() > 1);
if (not m_EncryptNext.empty())
{
m_Parent->QueueWork([self, data = m_EncryptNext] { self->EncryptWorker(data); });
m_Parent->QueueWork(
[self = shared_from_this(), data = m_EncryptNext] { self->EncryptWorker(data); });
m_EncryptNext.clear();
}
if (not m_DecryptNext.empty())
{
m_Parent->AddWakeup(weak_from_this());
m_Parent->QueueWork([self, data = m_DecryptNext] { self->DecryptWorker(data); });
m_Parent->QueueWork(
[self = shared_from_this(), data = m_DecryptNext] { self->DecryptWorker(data); });
m_DecryptNext.clear();
}
}
@ -613,6 +623,7 @@ namespace llarp
Session::HandleSessionData(Packet_t pkt)
{
m_DecryptNext.emplace_back(std::move(pkt));
TriggerPump();
}
void
@ -638,16 +649,18 @@ namespace llarp
++itr;
}
m_PlaintextRecv.tryPushBack(std::move(msgs));
m_PlaintextEmpty.clear();
m_Parent->WakeupPlaintext();
}
void
Session::HandlePlaintext()
{
while (not m_PlaintextRecv.empty())
if (m_PlaintextEmpty.test_and_set())
return;
while (auto maybe_queue = m_PlaintextRecv.tryPopFront())
{
auto queue = m_PlaintextRecv.popFront();
for (auto& result : queue)
for (auto& result : *maybe_queue)
{
LogTrace("Command ", int(result[PacketOverhead + 1]), " from ", m_RemoteAddr);
switch (result[PacketOverhead + 1])
@ -679,7 +692,7 @@ namespace llarp
}
}
SendMACK();
Pump();
m_Parent->WakeupPlaintext();
}
void
@ -774,6 +787,8 @@ namespace llarp
{
itr = m_RXMsgs.emplace(rxid, InboundMessage{rxid, sz, ShortHash{pos}, m_Parent->Now()})
.first;
TriggerPump();
sz = std::min(sz, uint16_t{FragmentSize});
if ((data.size() - XMITOverhead) == sz)
{

View File

@ -48,8 +48,11 @@ namespace llarp
/// inbound session
Session(LinkLayer* parent, const SockAddr& from);
~Session() = default;
// Signal the event loop that a pump is needed (idempotent)
void
TriggerPump();
// Does the actual pump
void
Pump() override;
@ -129,7 +132,7 @@ namespace llarp
return m_Inbound;
}
void
HandlePlaintext();
HandlePlaintext() override;
private:
enum class State
@ -191,13 +194,14 @@ namespace llarp
/// maps rxid to time recieved
std::unordered_map<uint64_t, llarp_time_t> m_ReplayFilter;
/// rx messages to send in next round of multiacks
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> m_SendMACKs;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<>> m_SendMACKs;
using CryptoQueue_t = std::vector<Packet_t>;
CryptoQueue_t m_EncryptNext;
CryptoQueue_t m_DecryptNext;
std::atomic_flag m_PlaintextEmpty;
llarp::thread::Queue<CryptoQueue_t> m_PlaintextRecv;
void

View File

@ -7,6 +7,7 @@
#include <llarp/util/fs.hpp>
#include <utility>
#include <unordered_set>
#include <llarp/router/abstractrouter.hpp>
static constexpr auto LINK_LAYER_TICK_INTERVAL = 100ms;
@ -38,7 +39,11 @@ namespace llarp
, m_SecretKey(keyManager->transportKey)
{}
ILinkLayer::~ILinkLayer() = default;
llarp_time_t
ILinkLayer::Now() const
{
return m_Router->loop()->time_now();
}
bool
ILinkLayer::HasSessionTo(const RouterID& id)
@ -124,10 +129,10 @@ namespace llarp
}
bool
ILinkLayer::Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port)
ILinkLayer::Configure(AbstractRouter* router, const std::string& ifname, int af, uint16_t port)
{
m_Loop = std::move(loop);
m_udp = m_Loop->make_udp(
m_Router = router;
m_udp = m_Router->loop()->make_udp(
[this]([[maybe_unused]] UDPHandle& udp, const SockAddr& from, llarp_buffer_t buf) {
ILinkSession::Packet_t pkt;
pkt.resize(buf.sz);
@ -163,7 +168,6 @@ namespace llarp
if (not m_udp->listen(m_ourAddr))
return false;
m_Loop->add_ticker([this] { Pump(); });
return true;
}
@ -247,6 +251,7 @@ namespace llarp
}
m_AuthedLinks.emplace(pk, itr->second);
itr = m_Pending.erase(itr);
m_Router->TriggerPump();
return true;
}
return false;
@ -345,7 +350,8 @@ namespace llarp
{
// Tie the lifetime of this repeater to this arbitrary shared_ptr:
m_repeater_keepalive = std::make_shared<int>(0);
m_Loop->call_every(LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
m_Router->loop()->call_every(
LINK_LAYER_TICK_INTERVAL, m_repeater_keepalive, [this] { Tick(Now()); });
return true;
}
@ -402,8 +408,7 @@ namespace llarp
Lock_t l(m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto [itr, end] = m_AuthedLinks.equal_range(r);
while (itr != end)
for (auto [itr, end] = m_AuthedLinks.equal_range(r); itr != end;)
{
itr->second->Close();
m_RecentlyClosed.emplace(itr->second->GetRemoteEndpoint(), now + CloseGraceWindow);

View File

@ -85,14 +85,11 @@ namespace llarp
SessionClosedHandler closed,
PumpDoneHandler pumpDone,
WorkerFunc_t doWork);
virtual ~ILinkLayer();
virtual ~ILinkLayer() = default;
/// get current time via event loop
llarp_time_t
Now() const
{
return m_Loop->time_now();
}
Now() const;
bool
HasSessionTo(const RouterID& pk);
@ -108,7 +105,7 @@ namespace llarp
SendTo_LL(const SockAddr& to, const llarp_buffer_t& pkt);
virtual bool
Configure(EventLoop_ptr loop, const std::string& ifname, int af, uint16_t port);
Configure(AbstractRouter* loop, const std::string& ifname, int af, uint16_t port);
virtual std::shared_ptr<ILinkSession>
NewOutboundSession(const RouterContact& rc, const AddressInfo& ai) = 0;
@ -225,6 +222,13 @@ namespace llarp
std::optional<int>
GetUDPFD() const;
// Gets a pointer to the router owning us.
AbstractRouter*
Router() const
{
return m_Router;
}
private:
const SecretKey& m_RouterEncSecret;
@ -239,7 +243,7 @@ namespace llarp
bool
PutSession(const std::shared_ptr<ILinkSession>& s);
EventLoop_ptr m_Loop;
AbstractRouter* m_Router;
SockAddr m_ourAddr;
std::shared_ptr<llarp::UDPHandle> m_udp;
SecretKey m_SecretKey;

View File

@ -42,7 +42,7 @@ namespace llarp
virtual void
OnLinkEstablished(ILinkLayer*){};
/// called every event loop tick
/// called during pumping
virtual void
Pump() = 0;
@ -130,5 +130,8 @@ namespace llarp
virtual util::StatusObject
ExtractStatus() const = 0;
virtual void
HandlePlaintext() = 0;
};
} // namespace llarp

View File

@ -153,18 +153,18 @@ struct lokinet_srv_lookup_private
auto lock = ctx->acquire();
if (ctx->impl and ctx->impl->IsUp())
{
ctx->impl->CallSafe([host, service, &promise, ctx, self = this]() {
ctx->impl->CallSafe([host, service, &promise, ctx, this]() {
auto ep = ctx->endpoint();
if (ep == nullptr)
{
promise.set_value(ENOTSUP);
return;
}
ep->LookupServiceAsync(host, service, [self, &promise, host](auto results) {
ep->LookupServiceAsync(host, service, [this, &promise, host](auto results) {
for (const auto& result : results)
{
if (auto maybe = SRVFromData(result, host))
self->results.emplace_back(*maybe);
this->results.emplace_back(*maybe);
}
promise.set_value(0);
});

View File

@ -31,12 +31,12 @@ namespace llarp
uint64_t _status,
HopHandler_ptr _hop,
AbstractRouter* _router,
const PathID_t& pathid)
PathID_t pathid)
: frames{std::move(_frames)}
, status{_status}
, hop{std::move(_hop)}
, router{_router}
, pathid{pathid}
, pathid{std::move(pathid)}
{}
~LRSM_AsyncHandler() = default;
@ -51,7 +51,7 @@ namespace llarp
void
queue_handle()
{
auto func = std::bind(&llarp::LRSM_AsyncHandler::handle, shared_from_this());
auto func = [self = shared_from_this()] { self->handle(); };
router->QueueWork(func);
}
};

View File

@ -52,6 +52,14 @@ namespace llarp
return IsBogon(addr) or IsBogon(HighestAddr());
}
/// return true if we intersect with a bogon range *and* we contain the given address
template <typename Addr>
bool
BogonContains(Addr&& addr) const
{
return BogonRange() and Contains(std::forward<Addr>(addr));
}
/// return the number of bits set in the hostmask
constexpr int
HostmaskBits() const
@ -63,6 +71,13 @@ namespace llarp
return bits::count_bits(netmask_bits);
}
/// return true if our range and other intersect
constexpr bool
operator*(const IPRange& other) const
{
return Contains(other) or other.Contains(*this);
}
/// return true if the other range is inside our range
constexpr bool
Contains(const IPRange& other) const

View File

@ -503,10 +503,10 @@ namespace llarp
IPRange{net::ExpandV4(xntohl(ifaddr)), net::ExpandV4(xntohl(ifmask))});
}
});
auto ownsRange = [&currentRanges](IPRange range) -> bool {
auto ownsRange = [&currentRanges](const IPRange& range) -> bool {
for (const auto& ownRange : currentRanges)
{
if (ownRange.Contains(range))
if (ownRange * range)
return true;
}
return false;

View File

@ -9,14 +9,11 @@ namespace llarp
bool
IHopHandler::HandleUpstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
if (m_UpstreamQueue == nullptr)
m_UpstreamQueue = std::make_shared<TrafficQueue_t>();
m_UpstreamQueue->emplace_back();
auto& pkt = m_UpstreamQueue->back();
auto& pkt = m_UpstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->loop()->wakeup();
r->TriggerPump();
return true;
}
@ -24,14 +21,11 @@ namespace llarp
bool
IHopHandler::HandleDownstream(const llarp_buffer_t& X, const TunnelNonce& Y, AbstractRouter* r)
{
if (m_DownstreamQueue == nullptr)
m_DownstreamQueue = std::make_shared<TrafficQueue_t>();
m_DownstreamQueue->emplace_back();
auto& pkt = m_DownstreamQueue->back();
auto& pkt = m_DownstreamQueue.emplace_back();
pkt.first.resize(X.sz);
std::copy_n(X.base, X.sz, pkt.first.begin());
pkt.second = Y;
r->loop()->wakeup();
r->TriggerPump();
return true;
}

View File

@ -26,7 +26,6 @@ namespace llarp
{
using TrafficEvent_t = std::pair<std::vector<byte_t>, TunnelNonce>;
using TrafficQueue_t = std::list<TrafficEvent_t>;
using TrafficQueue_ptr = std::shared_ptr<TrafficQueue_t>;
virtual ~IHopHandler() = default;
@ -74,16 +73,16 @@ namespace llarp
protected:
uint64_t m_SequenceNum = 0;
TrafficQueue_ptr m_UpstreamQueue;
TrafficQueue_ptr m_DownstreamQueue;
TrafficQueue_t m_UpstreamQueue;
TrafficQueue_t m_DownstreamQueue;
util::DecayingHashSet<TunnelNonce> m_UpstreamReplayFilter;
util::DecayingHashSet<TunnelNonce> m_DownstreamReplayFilter;
virtual void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) = 0;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) = 0;
virtual void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) = 0;

View File

@ -29,7 +29,7 @@ namespace llarp
std::weak_ptr<PathSet> pathset,
PathRole startingRoles,
std::string shortName)
: m_PathSet{pathset}, _role{startingRoles}, m_shortName{std::move(shortName)}
: m_PathSet{std::move(pathset)}, _role{startingRoles}, m_shortName{std::move(shortName)}
{
hops.resize(h.size());
@ -488,15 +488,15 @@ namespace llarp
LogDebug("failed to send upstream to ", Upstream());
}
}
r->linkManager().PumpLinks();
r->TriggerPump();
}
void
Path::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
Path::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector<RelayUpstreamMessage> sendmsgs(msgs->size());
std::vector<RelayUpstreamMessage> sendmsgs(msgs.size());
size_t idx = 0;
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
TunnelNonce n = ev.second;
@ -519,24 +519,22 @@ namespace llarp
void
Path::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
if (not m_UpstreamQueue.empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_UpstreamQueue, data);
r->QueueWork(
[self = shared_from_this(), data, r]() { self->UpstreamWork(std::move(data), r); });
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
}
void
Path::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
if (not m_DownstreamQueue.empty())
{
TrafficQueue_ptr data = nullptr;
std::swap(m_DownstreamQueue, data);
r->QueueWork(
[self = shared_from_this(), data, r]() { self->DownstreamWork(std::move(data), r); });
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
}
@ -570,11 +568,11 @@ namespace llarp
}
void
Path::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
Path::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
std::vector<RelayDownstreamMessage> sendMsgs(msgs->size());
std::vector<RelayDownstreamMessage> sendMsgs(msgs.size());
size_t idx = 0;
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
sendMsgs[idx].Y = ev.second;
@ -600,7 +598,7 @@ namespace llarp
m_RXRate += buf.sz;
if (HandleRoutingMessage(buf, r))
{
r->loop()->wakeup();
r->TriggerPump();
m_LastRecvMessage = r->Now();
}
}

View File

@ -388,10 +388,10 @@ namespace llarp
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) override;

View File

@ -125,8 +125,11 @@ namespace llarp
Mutex_t first; // protects second
TransitHopsMap_t second GUARDED_BY(first);
/// Invokes a callback for each transit path; visit must be invokable with a `const
/// TransitHop_ptr&` argument.
template <typename TransitHopVisitor>
void
ForEach(std::function<void(const TransitHop_ptr&)> visit) EXCLUDES(first)
ForEach(TransitHopVisitor&& visit) EXCLUDES(first)
{
Lock_t lock(first);
for (const auto& item : second)
@ -142,8 +145,11 @@ namespace llarp
util::Mutex first; // protects second
OwnedPathsMap_t second GUARDED_BY(first);
/// Invokes a callback for each owned path; visit must be invokable with a `const Path_ptr&`
/// argument.
template <typename OwnedHopVisitor>
void
ForEach(std::function<void(const Path_ptr&)> visit)
ForEach(OwnedHopVisitor&& visit)
{
util::Lock lock(first);
for (const auto& item : second)

View File

@ -104,7 +104,7 @@ namespace llarp
}
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
TransitHop::DownstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayDownstreamMessage> msgs;
@ -114,7 +114,7 @@ namespace llarp
}
self->HandleAllDownstream(std::move(msgs), r);
};
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
RelayDownstreamMessage msg;
const llarp_buffer_t buf(ev.first);
@ -140,17 +140,9 @@ namespace llarp
}
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
TransitHop::UpstreamWork(TrafficQueue_t msgs, AbstractRouter* r)
{
auto flushIt = [self = shared_from_this(), r]() {
std::vector<RelayUpstreamMessage> msgs;
while (auto maybe = self->m_UpstreamGather.tryPopFront())
{
msgs.push_back(*maybe);
}
self->HandleAllUpstream(std::move(msgs), r);
};
for (auto& ev : *msgs)
for (auto& ev : msgs)
{
const llarp_buffer_t buf(ev.first);
RelayUpstreamMessage msg;
@ -158,14 +150,19 @@ namespace llarp
msg.pathid = info.txID;
msg.Y = ev.second ^ nonceXOR;
msg.X = buf;
if (m_UpstreamGather.full())
{
r->loop()->call(flushIt);
}
if (m_UpstreamGather.enabled())
m_UpstreamGather.pushBack(msg);
if (m_UpstreamGather.tryPushBack(msg) != thread::QueueReturn::Success)
break;
}
r->loop()->call(flushIt);
// Flush it:
r->loop()->call([self = shared_from_this(), r] {
std::vector<RelayUpstreamMessage> msgs;
while (auto maybe = self->m_UpstreamGather.tryPopFront())
{
msgs.push_back(*maybe);
}
self->HandleAllUpstream(std::move(msgs), r);
});
}
void
@ -188,7 +185,6 @@ namespace llarp
other->FlushDownstream(r);
}
m_FlushOthers.clear();
r->loop()->wakeup();
}
else
{
@ -203,8 +199,8 @@ namespace llarp
info.upstream);
r->SendToOrQueue(info.upstream, msg);
}
r->linkManager().PumpLinks();
}
r->TriggerPump();
}
void
@ -221,31 +217,29 @@ namespace llarp
info.downstream);
r->SendToOrQueue(info.downstream, msg);
}
r->linkManager().PumpLinks();
r->TriggerPump();
}
void
TransitHop::FlushUpstream(AbstractRouter* r)
{
if (m_UpstreamQueue && not m_UpstreamQueue->empty())
if (not m_UpstreamQueue.empty())
{
r->QueueWork([self = shared_from_this(), data = std::move(m_UpstreamQueue), r]() mutable {
self->UpstreamWork(std::move(data), r);
});
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_UpstreamQueue, {}),
r]() mutable { self->UpstreamWork(std::move(data), r); });
}
m_UpstreamQueue = nullptr;
}
void
TransitHop::FlushDownstream(AbstractRouter* r)
{
if (m_DownstreamQueue && not m_DownstreamQueue->empty())
if (not m_DownstreamQueue.empty())
{
r->QueueWork([self = shared_from_this(), data = std::move(m_DownstreamQueue), r]() mutable {
self->DownstreamWork(std::move(data), r);
});
r->QueueWork([self = shared_from_this(),
data = std::exchange(m_DownstreamQueue, {}),
r]() mutable { self->DownstreamWork(std::move(data), r); });
}
m_DownstreamQueue = nullptr;
}
/// this is where a DHT message is handled at the end of a path, that is,

View File

@ -190,10 +190,10 @@ namespace llarp
protected:
void
UpstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
UpstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
DownstreamWork(TrafficQueue_ptr queue, AbstractRouter* r) override;
DownstreamWork(TrafficQueue_t queue, AbstractRouter* r) override;
void
HandleAllUpstream(std::vector<RelayUpstreamMessage> msgs, AbstractRouter* r) override;

View File

@ -210,9 +210,9 @@ namespace llarp
virtual void
Die() = 0;
/// pump low level links
/// Trigger a pump of low level links. Idempotent.
virtual void
PumpLL() = 0;
TriggerPump() = 0;
virtual bool
IsBootstrapNode(RouterID r) const = 0;

View File

@ -35,7 +35,7 @@ namespace llarp
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback) = 0;
virtual void
Tick() = 0;
Pump() = 0;
virtual void
RemovePath(const PathID_t& pathid) = 0;

View File

@ -1,9 +1,7 @@
#include "outbound_message_handler.hpp"
#include <llarp/messages/link_message.hpp>
#include "i_outbound_session_maker.hpp"
#include "i_rc_lookup_handler.hpp"
#include <llarp/link/i_link_manager.hpp>
#include "router.hpp"
#include <llarp/constants/link_layer.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <llarp/util/status.hpp>
@ -26,7 +24,8 @@ namespace llarp
const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
{
// if the destination is invalid, callback with failure and return
if (not _linkManager->SessionIsClient(remote) and not _lookupHandler->SessionIsAllowed(remote))
if (not _router->linkManager().SessionIsClient(remote)
and not _router->rcLookupHandler().SessionIsAllowed(remote))
{
DoCallback(callback, SendStatus::InvalidRouter);
return true;
@ -47,7 +46,7 @@ namespace llarp
std::copy_n(buf.base, buf.sz, message.first.data());
// if we have a session to the destination, queue the message and return
if (_linkManager->HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(remote))
{
QueueOutboundMessage(remote, std::move(message), msg.pathid, priority);
return true;
@ -82,12 +81,13 @@ namespace llarp
}
void
OutboundMessageHandler::Tick()
OutboundMessageHandler::Pump()
{
m_Killer.TryAccess([this]() {
recentlyRemovedPaths.Decay();
ProcessOutboundQueue();
SendRoundRobin();
if (/*bool more = */ SendRoundRobin())
_router->TriggerPump();
});
}
@ -127,13 +127,9 @@ namespace llarp
}
void
OutboundMessageHandler::Init(
ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop)
OutboundMessageHandler::Init(AbstractRouter* router)
{
_linkManager = linkManager;
_lookupHandler = lookupHandler;
_loop = std::move(loop);
_router = router;
outboundMessageQueues.emplace(zeroID, MessageQueue());
}
@ -168,14 +164,14 @@ namespace llarp
OutboundMessageHandler::DoCallback(SendStatusHandler callback, SendStatus status)
{
if (callback)
_loop->call([f = std::move(callback), status] { f(status); });
_router->loop()->call([f = std::move(callback), status] { f(status); });
}
void
OutboundMessageHandler::QueueSessionCreation(const RouterID& remote)
{
auto fn = util::memFn(&OutboundMessageHandler::OnSessionResult, this);
_linkManager->GetSessionMaker()->CreateSessionTo(remote, fn);
_router->linkManager().GetSessionMaker()->CreateSessionTo(remote, fn);
}
bool
@ -199,7 +195,7 @@ namespace llarp
const llarp_buffer_t buf(msg.first);
auto callback = msg.second;
m_queueStats.sent++;
return _linkManager->SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
return _router->linkManager().SendTo(remote, buf, [=](ILinkSession::DeliveryStatus status) {
if (status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
@ -212,7 +208,7 @@ namespace llarp
bool
OutboundMessageHandler::SendIfSession(const RouterID& remote, const Message& msg)
{
if (_linkManager->HasSessionTo(remote))
if (_router->linkManager().HasSessionTo(remote))
{
return Send(remote, msg);
}
@ -258,7 +254,7 @@ namespace llarp
// so check here if the pathid was recently removed.
if (recentlyRemovedPaths.Contains(entry.pathid))
{
return;
continue;
}
auto [queue_itr, is_new] = outboundMessageQueues.emplace(entry.pathid, MessageQueue());
@ -282,7 +278,7 @@ namespace llarp
}
}
void
bool
OutboundMessageHandler::SendRoundRobin()
{
m_queueStats.numTicks++;
@ -296,7 +292,6 @@ namespace llarp
routing_mq.pop();
}
size_t empty_count = 0;
size_t num_queues = roundRobinOrder.size();
// if any paths have been removed since last tick, remove any stale
@ -317,16 +312,16 @@ namespace llarp
removedSomePaths = false;
num_queues = roundRobinOrder.size();
size_t sent_count = 0;
if (num_queues == 0) // if no queues, return
if (num_queues == 0)
{
return;
return false;
}
// send messages for each pathid in roundRobinOrder, stopping when
// either every path's queue is empty or a set maximum amount of
// messages have been sent.
while (sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK)
size_t consecutive_empty = 0;
for (size_t sent_count = 0; sent_count < MAX_OUTBOUND_MESSAGES_PER_TICK;)
{
PathID_t pathid = std::move(roundRobinOrder.front());
roundRobinOrder.pop();
@ -339,24 +334,26 @@ namespace llarp
Send(entry.router, entry.message);
message_queue.pop();
empty_count = 0;
sent_count++;
consecutive_empty = 0;
consecutive_empty++;
}
else
{
empty_count++;
consecutive_empty++;
}
roundRobinOrder.push(std::move(pathid));
// if num_queues empty queues in a row, all queues empty.
if (empty_count == num_queues)
if (consecutive_empty == num_queues)
{
break;
}
}
m_queueStats.perTickMax = std::max((uint32_t)sent_count, m_queueStats.perTickMax);
m_queueStats.perTickMax = std::max((uint32_t)consecutive_empty, m_queueStats.perTickMax);
return consecutive_empty != num_queues;
}
void

View File

@ -17,8 +17,7 @@ struct llarp_buffer_t;
namespace llarp
{
struct ILinkManager;
struct I_RCLookupHandler;
struct AbstractRouter;
enum class SessionResult;
struct OutboundMessageHandler final : public IOutboundMessageHandler
@ -35,9 +34,9 @@ namespace llarp
* router, one is created.
*
* If there is a session to the destination router, the message is placed on the shared
* outbound message queue to be processed on Tick().
* outbound message queue to be processed on Pump().
*
* When this class' Tick() is called, that queue is emptied and the messages there
* When this class' Pump() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*
* Returns false if encoding the message into a buffer fails, true otherwise.
@ -48,7 +47,7 @@ namespace llarp
QueueMessage(const RouterID& remote, const ILinkMessage& msg, SendStatusHandler callback)
override EXCLUDES(_mutex);
/* Called once per event loop tick.
/* Called when pumping output queues, typically scheduled via a call to Router::TriggerPump().
*
* Processes messages on the shared message queue into their paths' respective
* individual queues.
@ -60,7 +59,7 @@ namespace llarp
* Sends messages from path queues until all are empty or a set cap has been reached.
*/
void
Tick() override;
Pump() override;
/* Called from outside this class to inform it that a path has died / expired
* and its queue should be discarded.
@ -72,7 +71,7 @@ namespace llarp
ExtractStatus() const override;
void
Init(ILinkManager* linkManager, I_RCLookupHandler* lookupHandler, EventLoop_ptr loop);
Init(AbstractRouter* router);
private:
using Message = std::pair<std::vector<byte_t>, SendStatusHandler>;
@ -146,7 +145,7 @@ namespace llarp
* If the queue is full, the message is dropped and the message's status
* callback is invoked with a congestion status.
*
* When this class' Tick() is called, that queue is emptied and the messages there
* When this class' Pump() is called, that queue is emptied and the messages there
* are placed in their paths' respective individual queues.
*/
bool
@ -160,14 +159,17 @@ namespace llarp
ProcessOutboundQueue();
/*
* Sends all routing messages that have been queued, indicated by pathid 0 when queued.
* Sends routing messages that have been queued, indicated by pathid 0 when queued.
*
* Sends messages from path queues until all are empty or a set cap has been reached.
* This will send one message from each queue in a round-robin fashion such that they
* all have roughly equal access to bandwidth. A notion of priority may be introduced
* at a later time, but for now only routing messages get priority.
*
* Returns true if there is more to send (i.e. we hit the limit before emptying all path
* queues), false if all queues were drained.
*/
void
bool
SendRoundRobin();
/* Invoked when an outbound session establish attempt has concluded.
@ -193,9 +195,7 @@ namespace llarp
std::queue<PathID_t> roundRobinOrder;
ILinkManager* _linkManager;
I_RCLookupHandler* _lookupHandler;
EventLoop_ptr _loop;
AbstractRouter* _router;
util::ContentionKiller m_Killer;

View File

@ -94,8 +94,7 @@ namespace llarp
if (shouldDoLookup)
{
auto fn =
std::bind(&RCLookupHandler::HandleDHTLookupResult, this, router, std::placeholders::_1);
auto fn = [this, router](const auto& res) { HandleDHTLookupResult(router, res); };
// if we are a client try using the hidden service endpoints
if (!isServiceNode)
@ -232,7 +231,7 @@ namespace llarp
if (!SessionIsAllowed(newrc.pubkey))
return false;
auto func = std::bind(&RCLookupHandler::CheckRC, this, newrc);
auto func = [this, newrc] { CheckRC(newrc); };
_work(func);
// update dht if required

View File

@ -69,6 +69,7 @@ namespace llarp
_running.store(false);
_lastTick = llarp::time_now_ms();
m_NextExploreAt = Clock_t::now();
m_Pump = _loop->make_waker([this]() { PumpLL(); });
}
Router::~Router()
@ -76,6 +77,20 @@ namespace llarp
llarp_dht_context_free(_dht);
}
void
Router::PumpLL()
{
llarp::LogTrace("Router::PumpLL() start");
if (_stopping.load())
return;
paths.PumpDownstream();
paths.PumpUpstream();
_hiddenServiceContext.Pump();
_outboundMessageHandler.Pump();
_linkManager.PumpLinks();
llarp::LogTrace("Router::PumpLL() end");
}
util::StatusObject
Router::ExtractStatus() const
{
@ -90,10 +105,7 @@ namespace llarp
{"links", _linkManager.ExtractStatus()},
{"outboundMessages", _outboundMessageHandler.ExtractStatus()}};
}
else
{
return util::StatusObject{{"running", false}};
}
return util::StatusObject{{"running", false}};
}
util::StatusObject
@ -243,16 +255,9 @@ namespace llarp
}
void
Router::PumpLL()
Router::TriggerPump()
{
llarp::LogTrace("Router::PumpLL() start");
if (_stopping.load())
return;
paths.PumpDownstream();
paths.PumpUpstream();
_outboundMessageHandler.Tick();
_linkManager.PumpLinks();
llarp::LogTrace("Router::PumpLL() end");
m_Pump->Trigger();
}
bool
@ -652,7 +657,7 @@ namespace llarp
LogInfo("Loaded ", bootstrapRCList.size(), " bootstrap routers");
// Init components after relevant config settings loaded
_outboundMessageHandler.Init(&_linkManager, &_rcLookupHandler, _loop);
_outboundMessageHandler.Init(this);
_outboundSessionMaker.Init(
this,
&_linkManager,
@ -701,13 +706,13 @@ namespace llarp
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&Router::ConnectionTimedOut, this),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::TriggerPump, this),
util::memFn(&AbstractRouter::QueueWork, this));
const std::string& key = serverConfig.interface;
int af = serverConfig.addressFamily;
uint16_t port = serverConfig.port;
if (!server->Configure(loop(), key, af, port))
if (!server->Configure(this, key, af, port))
{
throw std::runtime_error(stringify("failed to bind inbound link on ", key, " port ", port));
}
@ -1239,8 +1244,6 @@ namespace llarp
#ifdef _WIN32
// windows uses proactor event loop so we need to constantly pump
_loop->add_ticker([this] { PumpLL(); });
#else
_loop->set_pump_function([this] { PumpLL(); });
#endif
_loop->call_every(ROUTER_TICK_INTERVAL, weak_from_this(), [this] { Tick(); });
_running.store(true);
@ -1464,10 +1467,7 @@ namespace llarp
void
Router::QueueWork(std::function<void(void)> func)
{
if (m_isServiceNode)
_loop->call_soon(std::move(func));
else
m_lmq->job(std::move(func));
m_lmq->job(std::move(func));
}
void
@ -1507,7 +1507,7 @@ namespace llarp
util::memFn(&AbstractRouter::CheckRenegotiateValid, this),
util::memFn(&Router::ConnectionTimedOut, this),
util::memFn(&AbstractRouter::SessionClosed, this),
util::memFn(&AbstractRouter::PumpLL, this),
util::memFn(&AbstractRouter::TriggerPump, this),
util::memFn(&AbstractRouter::QueueWork, this));
if (!link)
@ -1515,7 +1515,7 @@ namespace llarp
for (const auto af : {AF_INET, AF_INET6})
{
if (not link->Configure(loop(), "*", af, m_OutboundPort))
if (not link->Configure(this, "*", af, m_OutboundPort))
continue;
#if defined(ANDROID)

View File

@ -76,6 +76,8 @@ namespace llarp
path::BuildLimiter m_PathBuildLimiter;
std::shared_ptr<EventLoopWakeup> m_Pump;
path::BuildLimiter&
pathBuildLimiter() override
{
@ -282,7 +284,10 @@ namespace llarp
RoutePoker m_RoutePoker;
void
PumpLL() override;
TriggerPump() override;
void
PumpLL();
const oxenmq::address DefaultRPCBindAddr = oxenmq::address::tcp("127.0.0.1", 1190);
bool enableRPCServer = false;

View File

@ -56,7 +56,7 @@ namespace llarp
Clear();
size_t idx = 0;
if (not bencode_read_list(
[self = this, &idx](llarp_buffer_t* buffer, bool has) {
[this, &idx](llarp_buffer_t* buffer, bool has) {
if (has)
{
uint64_t i;
@ -65,14 +65,14 @@ namespace llarp
uint64_t val = -1;
if (not bencode_read_integer(buffer, &val))
return false;
self->m_ProtoVersion = val;
m_ProtoVersion = val;
}
else if (bencode_read_integer(buffer, &i))
{
// prevent overflow (note that idx includes version too)
if (idx > self->m_Version.max_size())
if (idx > m_Version.max_size())
return false;
self->m_Version[idx - 1] = i;
m_Version[idx - 1] = i;
}
else
return false;

View File

@ -80,6 +80,14 @@ namespace llarp
}
}
void
Context::Pump()
{
auto now = time_now_ms();
for (auto& [name, endpoint] : m_Endpoints)
endpoint->Pump(now);
}
bool
Context::RemoveEndpoint(const std::string& name)
{

View File

@ -36,6 +36,10 @@ namespace llarp
void
ForEachService(std::function<bool(const std::string&, const Endpoint_ptr&)> visit) const;
/// Pumps the hidden service endpoints, called during Router::PumpLL
void
Pump();
/// add endpoint via config
void
AddEndpoint(const Config& conf, bool autostart = false);

View File

@ -1079,24 +1079,18 @@ namespace llarp
void
Endpoint::FlushRecvData()
{
do
while (auto maybe = m_RecvQueue.tryPopFront())
{
auto maybe = m_RecvQueue.tryPopFront();
if (not maybe)
return;
auto ev = std::move(*maybe);
auto& ev = *maybe;
ProtocolMessage::ProcessAsync(ev.fromPath, ev.pathid, ev.msg);
} while (true);
}
}
void
Endpoint::QueueRecvData(RecvDataEvent ev)
{
if (m_RecvQueue.full() or m_RecvQueue.empty())
{
m_router->loop()->call_soon([this] { FlushRecvData(); });
}
m_RecvQueue.tryPushBack(std::move(ev));
Router()->TriggerPump();
}
bool
@ -1167,6 +1161,7 @@ namespace llarp
|| (msg->proto == ProtocolType::QUIC and m_quic))
{
m_InboundTrafficQueue.tryPushBack(std::move(msg));
Router()->TriggerPump();
return true;
}
if (msg->proto == ProtocolType::Control)
@ -1587,7 +1582,7 @@ namespace llarp
if (*ptr == m_Identity.pub.Addr())
{
ConvoTagTX(tag);
Loop()->wakeup();
m_state->m_Router->TriggerPump();
if (not HandleInboundPacket(tag, pkt, t, 0))
return false;
ConvoTagRX(tag);
@ -1596,7 +1591,6 @@ namespace llarp
}
if (not SendToOrQueue(*maybe, pkt, t))
return false;
Loop()->wakeup();
return true;
}
LogDebug("SendToOrQueue failed: no endpoint for convo tag ", tag);
@ -1610,16 +1604,19 @@ namespace llarp
auto pkt = std::make_shared<net::IPPacket>();
if (!pkt->Load(buf))
return false;
EnsurePathToSNode(addr, [=](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
s->SendPacketToRemote(pkt->ConstBuffer(), t);
}
});
EnsurePathToSNode(
addr, [this, t, pkt = std::move(pkt)](RouterID, exit::BaseSession_ptr s, ConvoTag) {
if (s)
{
s->SendPacketToRemote(pkt->ConstBuffer(), t);
Router()->TriggerPump();
}
});
return true;
}
void Endpoint::Pump(llarp_time_t)
void
Endpoint::Pump(llarp_time_t now)
{
FlushRecvData();
// send downstream packets to user for snode
@ -1658,7 +1655,10 @@ namespace llarp
auto router = Router();
// TODO: locking on this container
for (const auto& [addr, outctx] : m_state->m_RemoteSessions)
{
outctx->FlushUpstream();
outctx->Pump(now);
}
// TODO: locking on this container
for (const auto& [router, session] : m_state->m_SNodeSessions)
session->FlushUpstream();
@ -1673,7 +1673,6 @@ namespace llarp
}
UpstreamFlush(router);
router->linkManager().PumpLinks();
}
std::optional<ConvoTag>
@ -1877,14 +1876,14 @@ namespace llarp
f.S = m->seqno;
f.F = p->intro.pathID;
transfer->P = replyIntro.pathID;
auto self = this;
Router()->QueueWork([transfer, p, m, K, self]() {
if (not transfer->T.EncryptAndSign(*m, K, self->m_Identity))
Router()->QueueWork([transfer, p, m, K, this]() {
if (not transfer->T.EncryptAndSign(*m, K, m_Identity))
{
LogError("failed to encrypt and sign for sessionn T=", transfer->T.T);
return;
}
self->m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
m_SendQueue.tryPushBack(SendEvent_t{transfer, p});
Router()->TriggerPump();
});
return true;
}
@ -1926,10 +1925,10 @@ namespace llarp
traffic[remote].emplace_back(data, t);
EnsurePathToService(
remote,
[self = this](Address addr, OutboundContext* ctx) {
[this](Address addr, OutboundContext* ctx) {
if (ctx)
{
for (auto& pending : self->m_state->m_PendingTraffic[addr])
for (auto& pending : m_state->m_PendingTraffic[addr])
{
ctx->AsyncEncryptAndSendTo(pending.Buffer(), pending.protocol);
}
@ -1938,7 +1937,7 @@ namespace llarp
{
LogWarn("no path made to ", addr);
}
self->m_state->m_PendingTraffic.erase(addr);
m_state->m_PendingTraffic.erase(addr);
},
PathAlignmentTimeout());
return true;

View File

@ -21,19 +21,20 @@ namespace llarp
, m_Endpoint(ep)
, createdAt(ep->Now())
, m_SendQueue(SendContextQueueSize)
{
m_FlushWakeup = ep->Loop()->make_waker([this] { FlushUpstream(); });
}
{}
bool
SendContext::Send(std::shared_ptr<ProtocolFrame> msg, path::Path_ptr path)
{
if (not path->IsReady())
return false;
m_FlushWakeup->Trigger();
return m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success;
if (path->IsReady()
and m_SendQueue.tryPushBack(std::make_pair(
std::make_shared<routing::PathTransferMessage>(*msg, remoteIntro.pathID), path))
== thread::QueueReturn::Success)
{
m_Endpoint->Router()->TriggerPump();
return true;
}
return false;
}
void
@ -42,23 +43,18 @@ namespace llarp
auto r = m_Endpoint->Router();
std::unordered_set<path::Path_ptr, path::Path::Ptr_Hash> flushpaths;
auto rttRMS = 0ms;
while (auto maybe = m_SendQueue.tryPopFront())
{
do
auto& [msg, path] = *maybe;
msg->S = path->NextSeqNo();
if (path->SendRoutingMessage(*msg, r))
{
auto maybe = m_SendQueue.tryPopFront();
if (not maybe)
break;
auto& item = *maybe;
item.first->S = item.second->NextSeqNo();
if (item.second->SendRoutingMessage(*item.first, r))
{
lastGoodSend = r->Now();
flushpaths.emplace(item.second);
m_Endpoint->ConvoTagTX(item.first->T.T);
const auto rtt = (item.second->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
} while (not m_SendQueue.empty());
lastGoodSend = r->Now();
flushpaths.emplace(path);
m_Endpoint->ConvoTagTX(msg->T.T);
const auto rtt = (path->intro.latency + remoteIntro.latency) * 2;
rttRMS += rtt * rtt.count();
}
}
// flush the select path's upstream
for (const auto& path : flushpaths)

View File

@ -56,8 +56,6 @@ namespace llarp
std::function<void(AuthResult)> authResultListener;
std::shared_ptr<EventLoopWakeup> m_FlushWakeup;
virtual bool
ShiftIntroduction(bool rebuild = true)
{

View File

@ -22,7 +22,6 @@ add_executable(testAll
crypto/test_llarp_crypto.cpp
crypto/test_llarp_key_manager.cpp
dns/test_llarp_dns_dns.cpp
iwp/test_iwp_session.cpp
net/test_ip_address.cpp
net/test_llarp_net.cpp
net/test_sock_addr.cpp

View File

@ -1,312 +0,0 @@
#include <catch2/catch.hpp>
#include <crypto/crypto.hpp>
#include <crypto/crypto_libsodium.hpp>
#include <string_view>
#include <router_contact.hpp>
#include <iwp/iwp.hpp>
#include <util/meta/memfn.hpp>
#include <messages/link_message_parser.hpp>
#include <messages/discard.hpp>
#include <util/time.hpp>
#include <net/net_if.hpp>
#include "ev/ev.hpp"
#undef LOG_TAG
#define LOG_TAG __FILE__
namespace iwp = llarp::iwp;
namespace util = llarp::util;
/// make an iwp link
template <bool inbound, typename... Args>
static llarp::LinkLayer_ptr
make_link(Args&&... args)
{
if (inbound)
return iwp::NewInboundLink(std::forward<Args>(args)...);
return iwp::NewOutboundLink(std::forward<Args>(args)...);
}
/// a single iwp link with associated keys and members to make unit tests work
struct IWPLinkContext
{
llarp::RouterContact rc;
llarp::IpAddress localAddr;
llarp::LinkLayer_ptr link;
std::shared_ptr<llarp::KeyManager> keyManager;
llarp::LinkMessageParser m_Parser;
llarp::EventLoop_ptr m_Loop;
/// is the test done on this context ?
bool gucci = false;
IWPLinkContext(std::string_view addr, llarp::EventLoop_ptr loop)
: localAddr{std::move(addr)}
, keyManager{std::make_shared<llarp::KeyManager>()}
, m_Parser{nullptr}
, m_Loop{std::move(loop)}
{
// generate keys
llarp::CryptoManager::instance()->identity_keygen(keyManager->identityKey);
llarp::CryptoManager::instance()->encryption_keygen(keyManager->encryptionKey);
llarp::CryptoManager::instance()->encryption_keygen(keyManager->transportKey);
// set keys in rc
rc.pubkey = keyManager->identityKey.toPublic();
rc.enckey = keyManager->encryptionKey.toPublic();
}
template <typename Func_t>
void
Call(Func_t work)
{
m_Loop->call_soon(std::move(work));
}
bool
HandleMessage(llarp::ILinkSession* from, const llarp_buffer_t& buf)
{
return m_Parser.ProcessFrom(from, buf);
}
/// initialize link
template <bool inbound>
void
InitLink(std::function<void(llarp::ILinkSession*)> established)
{
link = make_link<inbound>(
keyManager,
m_Loop,
// getrc
[&]() -> const llarp::RouterContact& { return rc; },
// link message handler
util::memFn(&IWPLinkContext::HandleMessage, this),
// sign buffer
[&](llarp::Signature& sig, const llarp_buffer_t& buf) {
REQUIRE(llarp::CryptoManager::instance()->sign(sig, keyManager->identityKey, buf));
return true;
},
// before connect
nullptr,
// established handler
[established](llarp::ILinkSession* s, bool linkIsInbound) {
REQUIRE(s != nullptr);
REQUIRE(inbound == linkIsInbound);
established(s);
return true;
},
// renegotiate handler
[](llarp::RouterContact newrc, llarp::RouterContact oldrc) {
REQUIRE(newrc.pubkey == oldrc.pubkey);
return true;
},
// timeout handler
[&](llarp::ILinkSession*) {
m_Loop->stop();
FAIL("session timeout");
},
// session closed handler
[](llarp::RouterID) {},
// pump done handler
[]() {},
// do work function
[l = m_Loop](llarp::Work_t work) { l->call_soon(work); });
REQUIRE(link->Configure(
m_Loop, llarp::net::LoopbackInterfaceName(), AF_INET, *localAddr.getPort()));
if (inbound)
{
// only add address info on the recipient's rc
rc.addrs.emplace_back();
REQUIRE(link->GetOurAddressInfo(rc.addrs.back()));
}
// sign rc
REQUIRE(rc.Sign(keyManager->identityKey));
REQUIRE(keyManager != nullptr);
}
};
using Context_ptr = std::shared_ptr<IWPLinkContext>;
/// run an iwp unit test after setup
/// call take 2 parameters, test and a timeout
///
/// test is a callable that takes 5 arguments:
/// 0) std::function<EventLoop_ptr(void)> that starts the iwp links and gives an event loop to call with
/// 1) std::function<void(void)> that ends the unit test if we are done
/// 2) std::function<void(void)> that ends the unit test right now as a success
/// 3) client iwp link context (shared_ptr)
/// 4) relay iwp link context (shared_ptr)
///
/// timeout is a std::chrono::duration that tells the driver how long to run the unit test for
/// before it should assume failure of unit test
template <typename Func_t, typename Duration_t = std::chrono::milliseconds>
void
RunIWPTest(Func_t test, Duration_t timeout = 10s)
{
// shut up logs
llarp::LogSilencer shutup;
// set up event loop
auto loop = llarp::EventLoop::create();
llarp::LogContext::Instance().Initialize(
llarp::eLogDebug, llarp::LogType::File, "stdout", "unit test", [loop](auto work) {
loop->call_soon(work);
});
// turn off bogon blocking
auto oldBlockBogons = llarp::RouterContact::BlockBogons;
llarp::RouterContact::BlockBogons = false;
// set up cryptography
llarp::sodium::CryptoLibSodium crypto{};
llarp::CryptoManager manager{&crypto};
// set up client
auto initiator = std::make_shared<IWPLinkContext>("127.0.0.1:3001", loop);
// set up server
auto recipient = std::make_shared<IWPLinkContext>("127.0.0.1:3002", loop);
// function for ending unit test on success
auto endIfDone = [initiator, recipient, loop]() {
if (initiator->gucci and recipient->gucci)
{
loop->stop();
}
};
// function to start test and give loop to unit test
auto start = [initiator, recipient, loop]() {
REQUIRE(initiator->link->Start());
REQUIRE(recipient->link->Start());
return loop;
};
// function to end test immediately
auto endTest = [loop] { loop->stop(); };
loop->call_later(timeout, [] { FAIL("test timeout"); });
test(start, endIfDone, endTest, initiator, recipient);
loop->run();
llarp::RouterContact::BlockBogons = oldBlockBogons;
}
/// ensure clients can connect to relays
TEST_CASE("IWP handshake", "[iwp]")
{
RunIWPTest([](std::function<llarp::EventLoop_ptr(void)> start,
std::function<void(void)> endIfDone,
[[maybe_unused]] std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
// set up initiator
alice->InitLink<false>([=](auto remote) {
REQUIRE(remote->GetRemoteRC() == bob->rc);
alice->gucci = true;
endIfDone();
});
// set up recipient
bob->InitLink<true>([=](auto remote) {
REQUIRE(remote->GetRemoteRC() == alice->rc);
bob->gucci = true;
endIfDone();
});
// start unit test
auto loop = start();
// try establishing a session
loop->call([link = alice->link, rc = bob->rc]() { REQUIRE(link->TryEstablishTo(rc)); });
});
}
/// ensure relays cannot connect to clients
TEST_CASE("IWP handshake reverse", "[iwp]")
{
RunIWPTest([](std::function<llarp::EventLoop_ptr(void)> start,
[[maybe_unused]] std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
alice->InitLink<false>([](auto) {});
bob->InitLink<true>([](auto) {});
// start unit test
auto loop = start();
// try establishing a session in the wrong direction
loop->call([link = bob->link, rc = alice->rc, endTestNow] {
REQUIRE(not link->TryEstablishTo(rc));
endTestNow();
});
});
}
/// ensure iwp can send messages between sessions
TEST_CASE("IWP send messages", "[iwp]")
{
int aliceNumSent = 0;
int bobNumSent = 0;
RunIWPTest([&aliceNumSent, &bobNumSent](std::function<llarp::EventLoop_ptr(void)> start,
std::function<void(void)> endIfDone,
std::function<void(void)> endTestNow,
Context_ptr alice,
Context_ptr bob) {
constexpr int numSend = 64;
// when alice makes a session to bob send `aliceNumSend` messages to him
alice->InitLink<false>([endIfDone, alice, &aliceNumSent](auto session) {
for (auto index = 0; index < numSend; index++)
{
alice->Call([session, endIfDone, alice, &aliceNumSent]() {
// generate a discard message that is 512 bytes long
llarp::DiscardMessage msg;
std::vector<byte_t> msgBuff(512);
llarp_buffer_t buf(msgBuff);
// add random padding
llarp::CryptoManager::instance()->randomize(buf);
// encode the discard message
msg.BEncode(&buf);
// send the message
session->SendMessageBuffer(msgBuff, [endIfDone, alice, &aliceNumSent](auto status) {
if (status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess)
{
// on successful transmit increment the number we sent
aliceNumSent++;
}
// if we sent all the messages sucessfully we end the unit test
alice->gucci = aliceNumSent == numSend;
endIfDone();
});
});
}
});
bob->InitLink<true>([endIfDone, bob, &bobNumSent](auto session) {
for (auto index = 0; index < numSend; index++)
{
bob->Call([session, endIfDone, bob, &bobNumSent]() {
// generate a discard message that is 512 bytes long
llarp::DiscardMessage msg;
std::vector<byte_t> msgBuff(512);
llarp_buffer_t buf(msgBuff);
// add random padding
llarp::CryptoManager::instance()->randomize(buf);
// encode the discard message
msg.BEncode(&buf);
// send the message
session->SendMessageBuffer(msgBuff, [endIfDone, bob, &bobNumSent](auto status) {
if (status == llarp::ILinkSession::DeliveryStatus::eDeliverySuccess)
{
// on successful transmit increment the number we sent
bobNumSent++;
}
// if we sent all the messages sucessfully we end the unit test
bob->gucci = bobNumSent == numSend;
endIfDone();
});
});
}
});
// start unit test
auto loop = start();
// try establishing a session from alice to bob
loop->call([link = alice->link, rc = bob->rc, endTestNow]() {
REQUIRE(link->TryEstablishTo(rc));
});
});
}

View File

@ -60,6 +60,17 @@ TEST_CASE("Range")
REQUIRE(!llarp::IPRange::FromIPv4(192, 168, 0, 1, 24)
.Contains(llarp::ipaddr_ipv4_bits(10, 200, 0, 253)));
}
SECTION("Intersecting networks")
{
const auto range_16 = llarp::IPRange::FromIPv4(10,9,0,1, 16);
const auto range_24a = llarp::IPRange::FromIPv4(10,9,0,1, 24);
const auto range_24b = llarp::IPRange::FromIPv4(10,9,1,1, 24);
const auto range_unrelated = llarp::IPRange::FromIPv4(1,9,1,1, 8);
REQUIRE(range_16 * range_24a);
REQUIRE(range_16 * range_24b);
REQUIRE(not(range_24a * range_24b));
REQUIRE(not(range_16 * range_unrelated));
}
}
TEST_CASE("IPv4 netmask")