MAX_SOCKETS wasn't working properly because ZMQ uses it when the context
is initialized, which happens when the first socket is constructed on
that context.
For OxenMQ, we had several sockets constructed on the context during
OxenMQ construction, which meant the context_t was being initialized
during OxenMQ construction, rather than during start(), and so setting
MAX_SOCKETS would have no effect and you'd always get the default.
This fixes it by making all the member variable zmq::socket_t's
default-constructed, then replacing them with proper zmq::socket_t's
during startup() so that we also defer zmq::context_t initialization to
the right place.
A second issue found during testing (also fixed here) is that the socket
worker threads use to communicate to the proxy could fail if the worker
socket creation would violate the zmq max sockets limit, which wound up
throwing an uncaught exception and aborting. This pre-initializes (but
doesn't connect) all potential worker threads sockets during start() so
that the lazily-initialized worker thread will have one already set up
rather than having to create a new one (which could fail).
bt_*, hex, base32z, base64 all moved to oxen-encoding a while ago; this
finishes the move by removing them from oxenmq and instead making oxenmq
depend on oxen-encoding.
This allows for on-the-fly encoding/decoding, and also allows for
on-the-fly transcoding between types without needing intermediate string
allocations (see added test cases for examples).
- Add {to,from}_{base64,base32z,hex}_size functions to calculate the
resulting output size from a given input size.
- Use it internally
- Make b32z and b64 validity checking slightly stricter: currently we
"accept" some b32z and b64 strings that contain an extra character
that leave us with 5-7 trailing bits (base32z) or 6 trailing bits
(base64). We simply ignore the extra one if decoding, but we
shouldn't accept it in the "is valid" calls.
Changes the 3-iterator versions of to_hex, from_b32z, etc. to return the
final output iterator, which allows for much easier in-place "from"
conversion without needing a new string by doing something like:
std::string data = /* some hex */;
auto end = oxenmq::from_hex(data.begin(), data.end(), data.begin();
data.erase(end, data.end());
Returning from the "to" converters is a bit less useful but doing it
anyway for consistency (and because it could still have some use, e.g.
if output is into some fixed buffer it lets you determine how much was
written).
inproc support is special in zmq: in particular it completely bypasses
the auth layer, which causes problems in OxenMQ because we assume that a
message will always have auth information (set during initial connection
handshake).
This adds an "always-on" inproc listener and adds a new `connect_inproc`
method for a caller to establish a connection to it.
It also throws exceptions if you try to `listen_plain` or `listen_curve`
on an inproc address, because that won't work for the reasons detailed
above.
The recent PR that revamped the connection IDs missed a case when
connecting to service nodes where we store the SN pubkey in peers, but
then fail to find the peer when we look it up by connection id.
This adds the required tracking to fix that case (and adds a test that
fails without the fix here).
- Allow up to 200ms (instead of 100ms) for the things we are waiting on
to become available, to prevent occasional spurious failures.
- Add unscoped info for how long we waited.
- Avoid calling into oxenmq with the catch lock held in the "hey google"
tests (because this will deadlock if the oxenmq call invokes any
logging).
- Replace an old std::cerr logger with the updated catch2 logger.
This commit adds support for listening on new ports after startup. This
will make things easier in storage server, in particular, where we want
to delay listening on public ports until we have an established
connection and initial block status update from oxend.
I realized after merging the previous PR that it is difficult to
correctly pass ownership into a timer, because something like:
TimerID x = omq.add_timer([&] { omq.cancel_timer(x); }, 5ms);
doesn't work when the timer job needs to outlive the caller. My next
approach was:
auto x = std::make_shared<TimerID>();
*x = omq.add_timer([&omq, x] { omq.cancel_timer(*x); }, 5ms);
but this has two problems: first, TimerID wasn't default constructible,
and second, there is no guarantee that the assignment to *x happens
before (and is visible to) the access for the cancellation.
This commit fixes both issues: TimerID is now default constructible, and
an overload is added that takes the lvalue reference to the TimerID to
set rather than returning it (and guarantees that it will be set before
the timer is created).
Updates `add_timer` to return a new opaque TimerID object that can later
be passed to `cancel_timer` to cancel an existing timer.
Also adds timer tests, which was omitted (except for one in the tagged
threads section), along with a new test for timer deletion.
This provides an interface for sending a reply to a message later (i.e.
after the Message& itself is no longer valid) by using a new
`send_later()` method of the Message instance that returns an object
that can properly route replies (and can outlive the Message it was
called on).
Intended use is:
run_this_lambda_later([send=msg.send_later()] {
send.reply("content");
});
which is equivalent to:
run_this_lambda_later([&msg] {
msg.send_reply("content");
});
except that it works properly even if the lambda is invoked beyond the
lifetime of `msg`.
Decoding into a std::byte output iterator was not working because the
`*out++ = val` assignment doesn't work when the output is std::byte and
val is a char/unsigned char/uint8_t. Instead we need to explicitly
cast, but figuring out what we have to cast to is a little bit tricky.
This PR makes it work (and bumps the version for this and the is_hex
fix).
`is_hex()` is a bit misleading as `from_hex()` requires an even-length
hex string, but `is_hex()` also allows odd-length hex strings, which
means currently callers should be doing `if (lokimq::is_hex(str) &&
str.size() % 2 == 0)`, but probably aren't.
Since the main point of `lokimq/hex.h` is for byte<->hex conversions it
doesn't make much sense to allow `is_hex()` to return true for something
that can't be validly decoded via `from_hex()`, thus this PR changes it
to return false.
If someone *really* wants to test for an odd-length hex string (though
I'm skeptical that there is a need for this), this also exposes
`is_hex_digit` so that they could use:
bool all_hex = std::all_of(str.begin(), str.end(), lokimq::is_hex_digit<char>)
Apple, in particular, often fails tests with an address already in use
if attempt to reuse a port that the process just closed, because it is a
wonderful OS.
Add var::get/var::visit implementations of std::get/std::visit that get
used if compiling for an old macos target, and use those.
The issue is that on a <10.14 macos target Apple's libc++ is missing
std::bad_variant_access, and so any method that can throw it (such as
std::get and std::visit) can't be used. This workaround is ugly, but
such is life when you want to support running on Apple platforms.
On the wire they are just lists, but this lets you put tuples onto and
pull tuples off of the wire. (Also supports std::pair).
Supports direct serialization (via bt_serialize()/bt_deserialize()),
list/dict consumer deserialization, and conversion from a bt_value or
bt_list via a new bt_tuple() function.
data_parts() wasn't currently used anywhere, and was broken: it was
calling bt_deserialize which was just wrong.
This repurposes it to take iterators over strings (or string-like types)
and append those parts as message parts.
Also adds tests for it.
This allows mixing some outside task into the lokimq job queue for a
category (queued up with native LMQ requests for that category) for use
when there is some external process that is able to generate messages.
For example, the most immediate use for this is to allow an HTTP server
to handle incoming RPC requests and, as soon as they arrive, inject them
into LokiMQ's queue for the "rpc" category so that native LMQ rpc
requests and HTTP rpc requests share the same thread pool and queue.
These injected jobs bypass all of LokiMQ's authentication and response
mechanisms: that's up to the invoked callback itself to manage.
Injected tasks are somewhat similar to batch jobs, but unlike batch jobs
the are queued and prioritized as ordinary external LokiMQ requests.
(Batch jobs, in contrast, have a higher scheduling priority, no queue
limits, and typically a larger available thread pool).
Deprecates the existing connect_remote() that takes remote addr and
pubkey as separate strings, just taking a `address` instead (into which
the caller can set pubkey/curve data as desired).
Also slightly changes how `connect_remote()` works when called with a
string remote but no pubkey: that string is now an augmented
lokimq::address string so that it can use the various formats supported
by `lokimq::address`.
(This was meant to be included in the PR that added `address` but
apparently didn't get implemented.)
There can be a spurious failure here if the backdoor_details element
hasn't been added yet, so lock & check it when waiting for the test
conditions.
The weirdest thing about this error is that it can fail but then when
expanding values they expand to *correct* values, i.e. so you get:
FAILED:
REQUIRE( backdoor_details == all_the_things )
with expansion:
{ "Alaska", "I'm the luckiest man in the world", "Loretta", "because all my
life are belong to Google", "moustache hatred", "photos", "scallops",
"snorted when she laughed", "tickled pink" }
==
{ "Alaska", "I'm the luckiest man in the world", "Loretta", "because all my
life are belong to Google", "moustache hatred", "photos", "scallops",
"snorted when she laughed", "tickled pink" }
The init function doesn't seem all that useful and makes the interface a
bit more complicated, so drop it.
Also addresses a race condition that can happen with tagged thread
startup when the proxy tries to talk to a tagged thread but the tagged
thread hasn't connected yet (which then aborts the proxy because it
assumes workers are always routable).
This renames the class to make it clearer what it does, and drops the
.name attribute from it so that it can cheaply be passed around. This
then means it can be cheaply passed by value (using std::optionals)
rather than by pointer when specifying a thread.
This adds to ability to have lokimq manage specific threads to which
jobs (individual, batch jobs, batch completions, or timers) can be
directed to. This allows dedicating a thread to some slow or
thread-unsafe action where you can dump jobs to the tagged thread as
a method of lockless job queuing.
We call libsodium functions which require a sodium_init() call; this is
usually a no-op (zmq will have already called it for us), but in case
zmq is built with tweetnacl instead of sodium we need to call it before
we call it directly in the LokiMQ ctor and the test suite.