Add request/reply abstraction

This allows making RPC requests with a callback that gets called when
the response comes back.  The is essentially a wrapper around doing it
yourself (i.e. by setting up a server-side "request" and client-side
"reply" command where "request" responds with a "reply" command), but
abstracted into lokimq itself as it is likely to be very useful when
integrating client/server connections rather than peer-to-peer
This commit is contained in:
Jason Rhinelander 2020-02-11 02:30:07 -04:00
parent 061bdee0a8
commit ccfb6d080b
5 changed files with 724 additions and 200 deletions

View File

@ -1,6 +1,8 @@
#include <queue>
#include <map>
#include <cassert>
#include <random>
extern "C" {
#include <sodium.h>
@ -102,6 +104,7 @@ void send_routed_message(zmq::socket_t &socket, std::string route, std::string m
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end());
// Sends some stuff to a socket directly.
void send_direct_message(zmq::socket_t &socket, std::string msg, std::string data = {}) {
std::array<zmq::message_t, 2> msgs{{create_message(std::move(msg))}};
if (!data.empty())
@ -142,18 +145,23 @@ std::string zmtp_metadata(string_view key, string_view value) {
return result;
void check_not_started(const std::thread& proxy_thread) {
void check_started(const std::thread& proxy_thread, const std::string &verb) {
if (!proxy_thread.joinable())
throw std::logic_error("Cannot " + verb + " before calling `start()`");
void check_not_started(const std::thread& proxy_thread, const std::string &verb) {
if (proxy_thread.joinable())
throw std::logic_error("Cannot add categories/commands/aliases after calling `start()`");
throw std::logic_error("Cannot " + verb + " after calling `start()`");
// Extracts and builds the "send" part of a message for proxy_send/proxy_reply
std::list<zmq::message_t> build_send_parts(bt_dict &data, const std::string &route) {
std::list<zmq::message_t> build_send_parts(bt_list_consumer send, string_view route) {
std::list<zmq::message_t> parts;
if (!route.empty())
for (auto &s :"send").get<bt_list>())
while (!send.is_finished())
return parts;
@ -229,7 +237,7 @@ LogLevel LokiMQ::log_level() const {
void LokiMQ::add_category(std::string name, Access access_level, unsigned int reserved_threads, int max_queue) {
check_not_started(proxy_thread, "add a category");
if (name.size() > MAX_CATEGORY_LENGTH)
throw std::runtime_error("Invalid category name `" + name + "': name too long (> " + std::to_string(MAX_CATEGORY_LENGTH) + ")");
@ -245,7 +253,7 @@ void LokiMQ::add_category(std::string name, Access access_level, unsigned int re
void LokiMQ::add_command(const std::string& category, std::string name, CommandCallback callback) {
check_not_started(proxy_thread, "add a command");
if (name.size() > MAX_COMMAND_LENGTH)
throw std::runtime_error("Invalid command name `" + name + "': name too long (> " + std::to_string(MAX_COMMAND_LENGTH) + ")");
@ -258,13 +266,18 @@ void LokiMQ::add_command(const std::string& category, std::string name, CommandC
if (command_aliases.count(fullname))
throw std::runtime_error("Cannot add command `" + fullname + "': a command alias with that name is already defined");
auto ins = catit->second.commands.emplace(std::move(name), std::move(callback));
auto ins = catit->second.commands.insert({std::move(name), {std::move(callback), false}});
if (!ins.second)
throw std::runtime_error("Cannot add command `" + fullname + "': that command already exists");
void LokiMQ::add_request_command(const std::string& category, std::string name, CommandCallback callback) {
add_command(category, name, std::move(callback)); = true;
void LokiMQ::add_command_alias(std::string from, std::string to) {
check_not_started(proxy_thread, "add a command alias");
if (from.empty())
throw std::runtime_error("Cannot add an alias for empty command");
@ -299,6 +312,8 @@ std::mutex control_sockets_mutex;
/// commands in a thread-safe manner. A mutex is only required here the first time a thread
/// accesses the control socket.
zmq::socket_t& LokiMQ::get_control_socket() {
// Maps the LokiMQ unique ID to a local thread command socket.
static thread_local std::map<int, std::shared_ptr<zmq::socket_t>> control_sockets;
static thread_local std::pair<int, std::shared_ptr<zmq::socket_t>> last{-1, nullptr};
@ -338,7 +353,7 @@ LokiMQ::LokiMQ(
AllowFunc allow,
Logger logger)
: object_id{next_id++}, pubkey{std::move(pubkey_)}, privkey{std::move(privkey_)}, local_service_node{service_node},
bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{logger},
bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{std::move(logger)},
poll_remote_offset{poll_internal_size + (bind.empty() ? 0 : 1)} {
LMQ_LOG(trace, "Constructing listening LokiMQ, id=", object_id, ", this=", this);
@ -404,11 +419,7 @@ void LokiMQ::start() {
void LokiMQ::worker_thread(unsigned int index) {
std::string worker_id = "w" + std::to_string(index);
zmq::socket_t sock{context, zmq::socket_type::dealer};
sock.setsockopt(ZMQ_ROUTING_ID,, worker_id.size());
sock.setsockopt(ZMQ_IDENTITY,, worker_id.size());
LMQ_LOG(debug, "New worker thread ", worker_id, " started");
@ -427,14 +438,21 @@ void LokiMQ::worker_thread(unsigned int index) {
} else {
message.pubkey = {, 32};
message.pubkey = run.pubkey;
message.service_node = run.service_node;;
if (run.callback->second /*is_request*/) {
message.reply_tag = {run.data_parts[0].data<char>(), run.data_parts[0].size()};
for (auto it = run.data_parts.begin() + 1; it != run.data_parts.end(); ++it)>data<char>(), it->size());
} else {
for (auto& m : run.data_parts)<char>(), m.size());
LMQ_LOG(trace, "worker thread ", worker_id, " invoking ", run.command, " callback with ",, " message parts");
@ -546,8 +564,19 @@ void LokiMQ::proxy_quit() {
LMQ_LOG(debug, "Proxy thread teardown complete");
void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) {
// FIXME: not passing a remote_pubkey is a problem
if (!remote_pubkey.empty())
socket.setsockopt(ZMQ_CURVE_SERVERKEY,, remote_pubkey.size());
socket.setsockopt(ZMQ_CURVE_PUBLICKEY,, pubkey.size());
socket.setsockopt(ZMQ_CURVE_SECRETKEY,, privkey.size());
socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
socket.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
socket.setsockopt(ZMQ_ROUTING_ID,, pubkey.size());
std::pair<zmq::socket_t *, std::string>
LokiMQ::proxy_connect(const std::string &remote, const std::string &connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) {
LokiMQ::proxy_connect_sn(const std::string &remote, const std::string &connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) {
auto &peer = peers[remote]; // We may auto-vivify here, but that's okay; it'll get cleaned up in idle_expiry if no connection gets established
std::pair<zmq::socket_t *, std::string> result = {nullptr, ""s};
@ -598,16 +627,7 @@ LokiMQ::proxy_connect(const std::string &remote, const std::string &connect_hint
LMQ_LOG(debug, to_hex(pubkey), " connecting to ", addr, " to reach ", to_hex(remote));
zmq::socket_t socket{context, zmq::socket_type::dealer};
socket.setsockopt(ZMQ_CURVE_SERVERKEY,, remote.size());
socket.setsockopt(ZMQ_CURVE_PUBLICKEY,, pubkey.size());
socket.setsockopt(ZMQ_CURVE_SECRETKEY,, privkey.size());
socket.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, SN_ZMQ_MAX_MSG_SIZE);
socket.setsockopt(ZMQ_ROUTING_ID,, pubkey.size());
socket.setsockopt(ZMQ_IDENTITY,, pubkey.size());
setup_outgoing_socket(socket, remote);
peer.idle_expiry = keep_alive;
@ -621,7 +641,7 @@ LokiMQ::proxy_connect(const std::string &remote, const std::string &connect_hint
return result;
std::pair<zmq::socket_t *, std::string> LokiMQ::proxy_connect(bt_dict &&data) {
std::pair<zmq::socket_t *, std::string> LokiMQ::proxy_connect_sn(bt_dict &&data) {
auto remote_pubkey ="pubkey").get<std::string>();
std::chrono::milliseconds keep_alive{get_int<int>("keep-alive"))};
std::string hint;
@ -631,24 +651,44 @@ std::pair<zmq::socket_t *, std::string> LokiMQ::proxy_connect(bt_dict &&data) {
bool optional = data.count("optional"), incoming = data.count("incoming");
return proxy_connect(remote_pubkey, hint, optional, incoming, keep_alive);
return proxy_connect_sn(remote_pubkey, hint, optional, incoming, keep_alive);
void LokiMQ::proxy_send(bt_dict &&data) {
const auto &remote_pubkey ="pubkey").get<std::string>();
void LokiMQ::proxy_send(bt_dict_consumer data) {
// NB: bt_dict_consumer goes in alphabetical order
std::string hint;
auto hint_it = data.find("hint");
if (hint_it != data.end())
hint = hint_it->second.get<std::string>();
std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE};
bool optional = false;
bool incoming = false;
bool request = false;
std::string request_tag;
std::unique_ptr<ReplyCallback> request_cbptr;
if (data.skip_until("hint"))
hint = data.consume_string();
if (data.skip_until("incoming"))
incoming = data.consume_integer<bool>();
if (data.skip_until("keep-alive"))
keep_alive = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (data.skip_until("optional"))
optional = data.consume_integer<bool>();
if (!data.skip_until("pubkey"))
throw std::runtime_error("Internal error: Invalid proxy send command; pubkey missing");
std::string remote_pubkey = data.consume_string();
if (data.skip_until("request"))
request = data.consume_integer<bool>();
if (request) {
if (!data.skip_until("request_callback"))
throw std::runtime_error("Internal error: received request without request_callback");
if (!data.skip_until("request_tag"))
throw std::runtime_error("Internal error: received request without request_name");
request_tag = data.consume_string();
if (!data.skip_until("send"))
throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing");
bt_list_consumer send = data.consume_list_consumer();
auto idle_it = data.find("keep-alive");
std::chrono::milliseconds keep_alive = idle_it != data.end()
? std::chrono::milliseconds{get_int<uint64_t>(idle_it->second)}
bool optional = data.count("optional"), incoming = data.count("incoming");
auto sock_route = proxy_connect(remote_pubkey, hint, optional, incoming, keep_alive);
auto sock_route = proxy_connect_sn(remote_pubkey, hint, optional, incoming, keep_alive);
if (!sock_route.first) {
if (optional)
LMQ_LOG(debug, "Not sending: send is optional and no connection to ", to_hex(remote_pubkey), " is currently established");
@ -656,8 +696,14 @@ void LokiMQ::proxy_send(bt_dict &&data) {
LMQ_LOG(error, "Unable to send to ", to_hex(remote_pubkey), ": no connection could be established");
if (request) {
pending_requests.insert({ request_tag, {
std::chrono::steady_clock::now() + REQUEST_TIMEOUT, std::move(*request_cbptr) }});
try {
send_message_parts(*sock_route.first, build_send_parts(data, sock_route.second));
send_message_parts(*sock_route.first, build_send_parts(send, sock_route.second));
} catch (const zmq::error_t &e) {
if (e.num() == EHOSTUNREACH && sock_route.first == &listener && !sock_route.second.empty()) {
// We *tried* to route via the incoming connection but it is no longer valid. Drop it,
@ -671,16 +717,25 @@ void LokiMQ::proxy_send(bt_dict &&data) {
void LokiMQ::proxy_reply(bt_dict &&data) {
const auto &route ="route").get<std::string>();
void LokiMQ::proxy_reply(bt_dict_consumer data) {
// NB: bt_dict_consumer goes in alphabetical order
string_view route = data.consume_string();
if (!listener.connected()) {
// FIXME: this is wrong; we can reply to something even with no listener (e.g. if client
// says A, server replies B, client replies to that with C).
LMQ_LOG(error, "Internal error: proxy_reply called but that shouldn't be possible as we have no listener!");
if (!data.skip_until("send"))
throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing");
bt_list_consumer send = data.consume_list_consumer();
try {
send_message_parts(listener, build_send_parts(data, route));
send_message_parts(listener, build_send_parts(send, route));
} catch (const zmq::error_t &err) {
if (err.num() == EHOSTUNREACH) {
LMQ_LOG(info, "Unable to send reply to incoming non-SN request: remote is no longer connected");
@ -690,8 +745,8 @@ void LokiMQ::proxy_reply(bt_dict &&data) {
void LokiMQ::proxy_batch(detail::Batch* batchptr) {
auto& batch = *batches.emplace(batchptr).first;
void LokiMQ::proxy_batch(detail::Batch* batch) {
const int jobs = batch->size();
for (int i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
@ -735,18 +790,22 @@ void LokiMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
auto route = view(parts[0]), cmd = view(parts[1]);
LMQ_LOG(trace, "control message: ", cmd);
if (parts.size() == 3) {
LMQ_LOG(trace, "...: ", parts[2]);
if (cmd == "SEND") {
LMQ_LOG(trace, "proxying message");
return proxy_send(bt_deserialize<bt_dict>(view(parts[2])));
return proxy_send(view(parts[2]));
} else if (cmd == "REPLY") {
LMQ_LOG(trace, "proxying reply to non-SN incoming message");
return proxy_reply(bt_deserialize<bt_dict>(view(parts[2])));
return proxy_reply(view(parts[2]));
} else if (cmd == "BATCH") {
LMQ_LOG(trace, "proxy batch jobs");
auto ptrval = bt_deserialize<uintptr_t>(view(parts[2]));
return proxy_batch(reinterpret_cast<detail::Batch*>(ptrval));
} else if (cmd == "CONNECT") {
} else if (cmd == "CONNECT_SN") {
} else if (cmd == "CONNECT_REMOTE") {
} else if (cmd == "TIMER") {
return proxy_timer(view(parts[2]));
@ -771,20 +830,30 @@ void LokiMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
" (" + std::to_string(parts.size()) + ")");
void LokiMQ::proxy_close_remote(int index, bool linger) {
remotes[index].second.setsockopt<int>(ZMQ_LINGER, linger ? std::chrono::milliseconds{CLOSE_LINGER}.count() : 0);
pollitems.erase(pollitems.begin() + poll_remote_offset + index);
remotes.erase(remotes.begin() + index);
assert(remotes.size() == pollitems.size() + poll_remote_offset);
for (auto& p : peers)
if (p.second.outgoing > index)
for (auto& pc : pending_connects) {
auto& i = std::get<int>(pc);
if (i > index)
auto LokiMQ::proxy_close_outgoing(decltype(peers)::iterator it) -> decltype(it) {
auto &peer = *it;
auto &info = peer.second;
if (info.outgoing >= 0) {
remotes[info.outgoing].second.setsockopt<int>(ZMQ_LINGER, std::chrono::milliseconds{CLOSE_LINGER}.count());
pollitems.erase(pollitems.begin() + poll_remote_offset + info.outgoing);
remotes.erase(remotes.begin() + info.outgoing);
assert(remotes.size() == pollitems.size() + poll_remote_offset);
for (auto &p : peers)
if (p.second.outgoing > info.outgoing)
info.outgoing = -1;
@ -822,6 +891,42 @@ void LokiMQ::proxy_expire_idle_peers() {
void LokiMQ::proxy_conn_cleanup() {
// Drop idle connections (if we haven't done it in a while) but *only* if we have some idle
// general workers: if we don't have any idle workers then we may still have incoming messages which
// we haven't processed yet and those messages might end up resetting the last activity time.
if (workers.size() < general_workers) {
LMQ_LOG(trace, "closing idle connections");
auto now = std::chrono::steady_clock::now();
// Check any pending outgoing connections for timeout
for (auto it = pending_connects.begin(); it != pending_connects.end(); ) {
auto& pc = *it;
if (std::get<1>(pc) < now) {
job([callback = std::move(std::get<3>(pc))] { callback("connection attempt timed out"); });
int index = std::get<0>(pc);
it = pending_connects.erase(it);
proxy_close_remote(index, false /*linger*/);
} else {
// Remove any expired pending requests and schedule their callback with a failure
for (auto it = pending_requests.begin(); it != pending_requests.end(); ) {
auto& callback = it->second;
if (callback.first < now) {
job([callback = std::move(callback.second)] { callback(false, {}); });
it = pending_requests.erase(it);
} else {
void LokiMQ::proxy_loop() {
zmq::socket_t zap_auth{context, zmq::socket_type::rep};
zap_auth.setsockopt<int>(ZMQ_LINGER, 0);
@ -839,9 +944,10 @@ void LokiMQ::proxy_loop() {
if (log_level() >= LogLevel::trace) {
LMQ_LOG(trace, "Reserving space for ", max_workers, " max workers = ", general_workers, " general + category reserved:");
LMQ_LOG(trace, "Reserving space for ", max_workers, " max workers = ", general_workers, " general + reserved:");
for (const auto& cat : categories)
LMQ_LOG(trace, " - ", cat.first, ": ", cat.second.reserved_threads);
LMQ_LOG(trace, " - (batch jobs): ", batch_jobs_reserved);
@ -860,7 +966,8 @@ void LokiMQ::proxy_loop() {
listener.setsockopt<int>(ZMQ_CURVE_SERVER, 1);
listener.setsockopt(ZMQ_CURVE_PUBLICKEY,, pubkey.size());
listener.setsockopt(ZMQ_CURVE_SECRETKEY,, privkey.size());
listener.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, SN_ZMQ_MAX_MSG_SIZE);
listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
listener.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
listener.setsockopt<int>(ZMQ_ROUTER_HANDOVER, 1);
listener.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);
@ -879,11 +986,19 @@ void LokiMQ::proxy_loop() {
assert(pollitems.size() == poll_remote_offset);
constexpr auto timeout_check_interval = 10000ms; // Minimum time before for checking for connections to close since the last check
auto last_conn_timeout = std::chrono::steady_clock::now();
if (!timers)
auto do_conn_cleanup = [this] { proxy_conn_cleanup(); };
using CleanupLambda = decltype(do_conn_cleanup);
if (-1 == zmq_timers_add(timers.get(),
// Wrap our lambda into a C function pointer where we pass in the lambda pointer as extra arg
[](int /*timer_id*/, void* cleanup) { (*static_cast<CleanupLambda*>(cleanup))(); },
&do_conn_cleanup)) {
throw zmq::error_t{};
std::vector<zmq::message_t> parts;
while (true) {
@ -932,9 +1047,9 @@ void LokiMQ::proxy_loop() {
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
const size_t num_sockets = remotes.size() + listener.connected();
std::queue<size_t> queue_index;
for (size_t i = 0; i < num_sockets; i++)
const int num_sockets = remotes.size() + listener.connected();
std::queue<int> queue_index;
for (int i = 0; i < num_sockets; i++)
for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) {
@ -958,23 +1073,11 @@ void LokiMQ::proxy_loop() {
proxy_to_worker(i, parts);
// Drop idle connections (if we haven't done it in a while) but *only* if we have some idle
// general workers: if we don't have any idle workers then we may still have incoming messages which
// we haven't processed yet and those messages might end up reset the last activity time.
if (workers.size() < general_workers) {
auto now = std::chrono::steady_clock::now();
if (now - last_conn_timeout >= timeout_check_interval) {
LMQ_LOG(trace, "closing idle connections");
last_conn_timeout = now;
LMQ_LOG(trace, "done proxy loop");
std::pair<LokiMQ::category*, const LokiMQ::CommandCallback*> LokiMQ::get_command(std::string& command) {
std::pair<LokiMQ::category*, const std::pair<LokiMQ::CommandCallback, bool>*> LokiMQ::get_command(std::string& command) {
if (command.size() > MAX_CATEGORY_LENGTH + 1 + MAX_COMMAND_LENGTH) {
LMQ_LOG(warn, "Invalid command '", command, "': command too long");
return {};
@ -1082,15 +1185,21 @@ void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(zmq::message_t& msg) {
decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(int conn_index, zmq::message_t& msg) {
bool is_outgoing_conn = !listener.connected() || conn_index > 0;
std::string pubkey;
bool service_node;
bool service_node = false;
if (!is_outgoing_conn) {
try {
extract_pubkey(msg, pubkey, service_node);
} catch (...) {
LMQ_LOG(error, "Internal error: message metadata not set or invalid; dropping message");
throw std::out_of_range("message pubkey metadata invalid");
} else {
pubkey = remotes[conn_index].first;
auto it = peers.find(pubkey);
if (it == peers.end())
@ -1099,11 +1208,77 @@ decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(zmq::message_t& msg)
return it;
bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>& parts) {
(void) conn_index; // FIXME
auto cmd = view(parts.front());
if (cmd == "BYE") {
auto pit = proxy_lookup_peer(parts.front());
// Return true if we recognized/handled the builtin command (even if we reject it for whatever
// reason)
bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector<zmq::message_t>& parts) {
string_view route, cmd;
bool is_outgoing_conn = !listener.connected() || conn_index > 0;
if (parts.size() < (is_outgoing_conn ? 1 : 2)) {
LMQ_LOG(warn, "Received empty message; ignoring");
return true;
if (is_outgoing_conn) {
cmd = view(parts[0]);
} else {
route = view(parts[0]);
cmd = view(parts[1]);
LMQ_LOG(trace, "Checking for builtins: ", cmd, " from ", peer_address(parts.back()));
if (cmd == "REPLY") {
size_t tag_pos = (is_outgoing_conn ? 1 : 2);
if (parts.size() <= tag_pos) {
LMQ_LOG(warn, "Received REPLY without a reply tag; ignoring");
return true;
std::string reply_tag = view(parts[1]);
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
LMQ_LOG(debug, "Received REPLY for pending command; scheduling callback");
std::vector<std::string> data;
data.reserve(parts.size() - (tag_pos + 1));
for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it)
proxy_schedule_job([callback=std::move(it->second.second), data=std::move(data)] {
callback(true, std::move(data));
} else {
LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
return true;
} else if (cmd == "HI") {
if (is_outgoing_conn) {
LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring");
return true;
LMQ_LOG(info, "Incoming client from ", peer_address(parts.back()), " send HI, replying with HELLO");
send_routed_message(listener, route, "HELLO");
return true;
} else if (cmd == "HELLO") {
if (!is_outgoing_conn) {
LMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring");
return true;
auto it = std::find_if(pending_connects.begin(), pending_connects.end(),
[&](auto& pc) { return std::get<0>(pc) == conn_index; });
if (it == pending_connects.end()) {
LMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring");
return true;
LMQ_LOG(info, "Got initial HELLO server response from ", peer_address(parts.back()));
size_t pksize = 32;
remotes[conn_index].second.getsockopt(ZMQ_CURVE_SERVERKEY, &remotes[conn_index].first[0], &pksize);
auto &peer = peers[remotes[conn_index].first];
peer.idle_expiry = 365 * 24h;
peer.outgoing = conn_index;
peer.service_node = false;
proxy_schedule_job([on_success=std::move(std::get<2>(*it)), pk=remotes[conn_index].first] { on_success(std::move(pk)); });
return true;
} else if (cmd == "BYE") {
auto pit = proxy_lookup_peer(conn_index, parts.front());
return true;
@ -1191,7 +1366,7 @@ void LokiMQ::proxy_process_queue() {
void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& parts) {
auto pit = proxy_lookup_peer(parts.back());
auto pit = proxy_lookup_peer(conn_index, parts.back());
string_view pubkey = pit->first;
auto& peer_info = pit->second;
@ -1234,6 +1409,11 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
if (cat_call.second->second /*is_request*/ && data_parts.empty()) {
LMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message");
auto& run = get_idle_worker();
run.is_batch_job = false; = &category;
@ -1411,11 +1591,81 @@ LokiMQ::~LokiMQ() {
LMQ_LOG(info, "LokiMQ proxy thread has stopped");
void LokiMQ::connect(const std::string &pubkey, std::chrono::milliseconds keep_alive, const std::string &hint) {
detail::send_control(get_control_socket(), "CONNECT", bt_serialize<bt_dict>({{"pubkey",pubkey}, {"keep-alive",keep_alive.count()}, {"hint",hint}}));
void LokiMQ::connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive, string_view hint) {
check_started(proxy_thread, "connect");
detail::send_control(get_control_socket(), "CONNECT_SN", bt_serialize<bt_dict>({{"pubkey",pubkey}, {"keep-alive",keep_alive.count()}, {"hint",hint}}));
inline void LokiMQ::job(std::function<void()> f) {
void LokiMQ::connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
string_view pubkey, std::chrono::milliseconds timeout) {
if (!proxy_thread.joinable())
LMQ_LOG(warn, "connect_remote() called before start(); this won't take effect until start() is called");
LMQ_LOG(trace, "telling proxy to connect to ", remote, ", expecting pubkey [", to_hex(pubkey), "]");
detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize<bt_dict>({
{"remote", remote},
{"pubkey", pubkey},
{"timeout", timeout.count()},
{"connect", reinterpret_cast<uintptr_t>(new ConnectSuccess{std::move(on_connect)})},
{"failure", reinterpret_cast<uintptr_t>(new ConnectFailure{std::move(on_failure)})},
void LokiMQ::proxy_connect_remote(bt_dict_consumer data) {
ConnectSuccess on_connect;
ConnectFailure on_failure;
std::string remote;
std::string remote_pubkey;
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT;
if (data.skip_until("connect")) {
auto* ptr = reinterpret_cast<ConnectSuccess*>(data.consume_integer<uintptr_t>());
on_connect = std::move(*ptr);
delete ptr;
if (data.skip_until("failure")) {
auto* ptr = reinterpret_cast<ConnectFailure*>(data.consume_integer<uintptr_t>());
on_failure = std::move(*ptr);
delete ptr;
if (data.skip_until("pubkey")) {
remote_pubkey = data.consume_string();
assert(remote_pubkey.size() == 32 || remote_pubkey.empty());
if (data.skip_until("remote"))
remote = data.consume_string();
if (data.skip_until("timeout"))
timeout = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (remote.empty())
throw std::runtime_error("Internal error: CONNECT_REMOTE proxy command missing required 'remote' value");
LMQ_LOG(info, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (any pubkey)" : " expecting pubkey " + to_hex(remote_pubkey));
zmq::socket_t sock{context, zmq::socket_type::dealer};
try {
setup_outgoing_socket(sock, remote_pubkey);
} catch (const zmq::error_t &e) {
proxy_schedule_job([on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { on_failure(std::move(what)); });
LMQ_LOG(debug, "Opened new zmq socket to ", remote, ", sending HI");
send_direct_message(sock, "HI");
remotes.emplace_back("", std::move(sock));
pending_connects.emplace_back(remotes.size()-1, std::chrono::steady_clock::now() + timeout,
std::move(on_connect), std::move(on_failure));
void LokiMQ::disconnect_remote(string_view id, std::chrono::milliseconds linger) {
(void)id, (void)linger;
void LokiMQ::job(std::function<void()> f) {
auto* b = new Batch<void>;
auto* baseptr = static_cast<detail::Batch*>(b);
@ -1479,6 +1729,15 @@ std::ostream &operator<<(std::ostream &os, LogLevel lvl) {
return os;
std::string make_random_string(size_t size) {
static thread_local std::mt19937_64 rng{std::random_device{}()};
static thread_local std::uniform_int_distribution<char> dist{std::numeric_limits<char>::min(), std::numeric_limits<char>::max()};
std::string rando;
for (size_t i = 0; i < size; i++)
rando += dist(rng);
return rando;
} // namespace lokimq
// vim:sw=4:et

View File

@ -91,8 +91,10 @@ class Message {
LokiMQ& lokimq; ///< The owning LokiMQ object
std::vector<string_view> data; ///< The provided command data parts, if any.
string_view pubkey; ///< The originator pubkey (32 bytes)
string_view id; ///< The remote's unique, opaque id for routing.
string_view pubkey; ///< The remote's pubkey (32 bytes)
bool service_node; ///< True if the pubkey is an active SN (note that this is only checked on initial connection, not every received message)
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`.
/// Constructor
Message(LokiMQ& lmq) : lokimq{lmq} {}
@ -101,30 +103,45 @@ public:
Message(const Message&) = delete;
Message& operator=(const Message&) = delete;
/// Sends a reply. Arguments are forwarded to send() but with send_option::optional{} added
/// if the originator is not a SN. For SN messages (i.e. where `sn` is true) this is a
/// "strong" reply by default in that the proxy will attempt to establish a new connection
/// to the SN if no longer connected. For non-SN messages the reply will be attempted using
/// the available routing information, but if the connection has already been closed the
/// reply will be dropped.
/// Sends a command back to whomever sent this message. Arguments are forwarded to send() but
/// with send_option::optional{} added if the originator is not a SN. For SN messages (i.e.
/// where `sn` is true) this is a "strong" reply by default in that the proxy will attempt to
/// establish a new connection to the SN if no longer connected. For non-SN messages the reply
/// will be attempted using the available routing information, but if the connection has already
/// been closed the reply will be dropped.
/// If you want to send a non-strong reply even when the remote is a service node then add
/// an explicit `send_option::optional()` argument.
template <typename... Args>
void reply(const std::string& command, Args&&... args);
void send_back(const std::string& command, Args&&... args);
/// Sends a reply to a request. This takes no command: the command is always the built-in
/// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other
/// arguments are as in `send_back()`.
template <typename... Args>
void send_reply(Args&&... args);
// Forward declarations; see batch.h
namespace detail { class Batch; }
template <typename R> class Batch;
/** The keep-alive time for a send() that results in a establishing a new outbound connection. To
* use a longer keep-alive to a host call `connect()` first with the desired keep-alive time or pass
* the send_option::keep_alive.
static constexpr auto DEFAULT_SEND_KEEP_ALIVE = 30s;
// How frequently we cleanup connections (closing idle connections, calling connect or request failure callbacks)
static constexpr auto CONN_CHECK_INTERVAL = 1s;
// The default timeout for connect_remote()
static constexpr auto REMOTE_CONNECT_TIMEOUT = 10s;
// The minimum amount of time we wait for a reply to a REQUEST before calling the callback with
// `false` to signal a timeout.
static constexpr auto REQUEST_TIMEOUT = 15s;
/// Maximum length of a category
static constexpr size_t MAX_CATEGORY_LENGTH = 50;
@ -195,12 +212,21 @@ public:
/// The callback type for registered commands.
using CommandCallback = std::function<void(Message& message)>;
/// The callback for making requests. This is called with `true` and a (moved) vector of data
/// part strings when we get a reply, or `false` and empty vector on timeout.
using ReplyCallback = std::function<void(bool success, std::vector<std::string> data)>;
/// Called to write a log message. This will only be called if the `level` is >= the current
/// LokiMQ object log level. It must be a raw function pointer (or a capture-less lambda) for
/// performance reasons. Takes four arguments: the log level of the message, the filename and
/// line number where the log message was invoked, and the log message itself.
using Logger = std::function<void(LogLevel level, const char* file, int line, std::string msg)>;
/// Callback for the success case of connect_remote()
using ConnectSuccess = std::function<void(const std::string& pubkey)>;
/// Callback for the failure case of connect_remote()
using ConnectFailure = std::function<void(const std::string& reason)>;
/// Explicitly non-copyable, non-movable because most things here aren't copyable, and a few
/// things aren't movable, either. If you need to pass the LokiMQ instance around, wrap it
/// in a unique_ptr or shared_ptr.
@ -211,11 +237,11 @@ public:
/** How long to wait for handshaking to complete on external connections before timing out and
* closing the connection. Setting this only affects new outgoing connections. */
std::chrono::milliseconds SN_HANDSHAKE_TIME = 10s;
std::chrono::milliseconds HANDSHAKE_TIME = 10s;
/** Maximum incoming message size; if a remote tries sending a message larger than this they get
* disconnected. -1 means no limit. */
int64_t SN_ZMQ_MAX_MSG_SIZE = 1 * 1024 * 1024;
int64_t MAX_MSG_SIZE = 1 * 1024 * 1024;
/** How long (in ms) to linger sockets when closing them; this is the maximum time zmq spends
* trying to sending pending messages before dropping them and closing the underlying socket
@ -254,6 +280,10 @@ private:
/// Info about a peer's established connection to us. Note that "established" means both
/// connected and authenticated.
struct peer_info {
/// Pubkey of the remote; can be empty (especially before handshake) but will only be set if
/// the pubkey has been verified.
std::string pubkey;
/// True if we've authenticated this peer as a service node.
bool service_node = false;
@ -281,15 +311,20 @@ private:
std::chrono::milliseconds idle_expiry;
struct pk_hash {
size_t operator()(const std::string& pubkey) const {
size_t h;
std::memcpy(&h,, sizeof(h));
return h;
/// Currently peer connections, pubkey -> peer_info
std::unordered_map<std::string, peer_info, pk_hash> peers;
/// Currently peer connections: id -> peer_info. id == pubkey for incoming and outgoing SN
/// connections; random string for outgoing direct connections.
std::unordered_map<std::string, peer_info> peers;
/// Remotes we are still trying to connect to (via connect_remote(), not connect_sn()); when
/// we pass handshaking we move them out of here and (if set) trigger the on_connect callback.
/// Unlike regular node-to-node peers, these have an extra "HI"/"HELLO" sequence that we used
/// before we consider ourselves connected to the remote.
std::vector<std::tuple<int /*remotes index*/, std::chrono::steady_clock::time_point, ConnectSuccess, ConnectFailure>> pending_connects;
/// Pending requests that have been sent out but not yet received a matching "REPLY". The value
/// is the timeout timestamp.
std::unordered_map<std::string, std::pair<std::chrono::steady_clock::time_point, ReplyCallback>>
/// different polling sockets the proxy handler polls: this always contains some internal
/// sockets for inter-thread communication followed by listener socket and a pollitem for every
@ -309,6 +344,8 @@ private:
/// The outgoing remote connections we currently have open along with the remote pubkeys. Each
/// element [i] here corresponds to an the pollitem_t at pollitems[i+1+poll_internal_size].
/// (Ideally we'd use one structure, but zmq requires the pollitems be in contiguous storage).
/// For new connections established via connect_remote the pubkey will be empty until we
/// do the HI/HELLO handshake over the socket.
std::vector<std::pair<std::string, zmq::socket_t>> remotes;
/// Socket we listen on to receive control messages in the proxy thread. Each thread has its own
@ -351,21 +388,25 @@ private:
/// Does the proxying work
void proxy_loop();
void proxy_conn_cleanup();
void proxy_worker_message(std::vector<zmq::message_t>& parts);
void proxy_process_queue();
Batch<void>* proxy_schedule_job(std::function<void()> f);
/// Looks up a peers element given a zmq message (which has the pubkey and sn status metadata
/// set during initial connection authentication), creating a new peer element if required.
decltype(peers)::iterator proxy_lookup_peer(zmq::message_t& msg);
/// Looks up a peers element given a connect index (for outgoing connections where we already
/// knew the pubkey and SN status) or an incoming zmq message (which has the pubkey and sn
/// status metadata set during initial connection authentication), creating a new peer element
/// if required.
decltype(peers)::iterator proxy_lookup_peer(int conn_index, zmq::message_t& msg);
/// Handles built-in primitive commands in the proxy thread for things like "BYE" that have to
/// be done in the proxy thread anyway (if we forwarded to a worker the worker would just have
/// to send an instruction back to the proxy to do it). Returns true if one was handled, false
/// to continue with sending to a worker.
bool proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>& parts);
bool proxy_handle_builtin(int conn_index, std::vector<zmq::message_t>& parts);
struct run_info;
/// Gets an idle worker's run_info and removes the worker from the idle worker list. If there
@ -387,24 +428,31 @@ private:
/// gets called after all works have done so.
void proxy_quit();
// Sets the various properties on an outgoing socket prior to connection.
void setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey = {});
/// Common connection implementation used by proxy_connect/proxy_send. Returns the socket
/// and, if a routing prefix is needed, the required prefix (or an empty string if not needed).
/// For an optional connect that fail, returns nullptr for the socket.
std::pair<zmq::socket_t*, std::string> proxy_connect(const std::string& pubkey, const std::string& connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive);
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(const std::string& pubkey, const std::string& connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive);
/// CONNECT command telling us to connect to a new pubkey. Returns the socket (which could be
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could be
/// existing or a new one).
std::pair<zmq::socket_t*, std::string> proxy_connect(bt_dict&& data);
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(bt_dict&& data);
/// Opens a new connection to a remote, with callbacks. This is the proxy-side implementation
/// of the `connect_remote()` call.
void proxy_connect_remote(bt_dict_consumer data);
/// Called to disconnect our remote connection to the given pubkey (if we have one).
void proxy_disconnect(const std::string& pubkey);
/// SEND command. Does a connect first, if necessary.
void proxy_send(bt_dict&& data);
void proxy_send(bt_dict_consumer data);
/// REPLY command. Like SEND, but only has a listening socket route to send back to and so is
/// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open).
void proxy_reply(bt_dict&& data);
void proxy_reply(bt_dict_consumer data);
/// Currently active batches.
std::unordered_set<detail::Batch*> batches;
@ -438,6 +486,9 @@ private:
/// affects outgoing connections; incomings connections are the responsibility of the other end.
void proxy_expire_idle_peers();
/// Helper method to actually close a remote connection and update the stuff that needs updating.
void proxy_close_remote(int removed, bool linger = true);
/// Closes an outgoing connection immediately, updates internal variables appropriately.
/// Returns the next iterator (the original may or may not be removed from peers, depending on
/// whether or not it also has an active incoming connection).
@ -445,7 +496,7 @@ private:
struct category {
Access access;
std::unordered_map<std::string, CommandCallback> commands;
std::unordered_map<std::string, std::pair<CommandCallback, bool /*is_request*/>> commands;
unsigned int reserved_threads = 0;
unsigned int active_threads = 0;
int max_queue = 200;
@ -466,7 +517,7 @@ private:
/// Retrieve category and callback from a command name, including alias mapping. Warns on
/// invalid commands and returns nullptrs. The command name will be updated in place if it is
/// aliased to another command.
std::pair<category*, const CommandCallback*> get_command(std::string& command);
std::pair<category*, const std::pair<CommandCallback, bool>*> get_command(std::string& command);
/// Checks a peer's authentication level. Returns true if allowed, warns and returns false if
/// not.
@ -479,11 +530,12 @@ private:
category& cat;
std::string command;
std::vector<zmq::message_t> data_parts;
const CommandCallback* callback;
const std::pair<CommandCallback, bool>* callback;
std::string pubkey;
std::string id;
bool service_node;
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts, const CommandCallback* callback, std::string pubkey, bool service_node)
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts, const std::pair<CommandCallback, bool>* callback, std::string pubkey, bool service_node)
: cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, callback{callback}, pubkey{std::move(pubkey)}, service_node{service_node} {}
std::list<pending_command> pending_commands;
@ -510,7 +562,7 @@ private:
int batch_jobno; // >= 0 for a job, -1 for the completion job
union {
const CommandCallback* callback; // set if !is_batch_job
const std::pair<CommandCallback, bool>* callback; // set if !is_batch_job
detail::Batch* batch; // set if is_batch_job
@ -576,6 +628,17 @@ public:
AllowFunc allow_connection,
Logger logger = [](LogLevel, const char*, int, std::string) { });
* Simplified LokiMQ constructor for a client. This does not bind, generates ephemeral keys,
* and doesn't have peer_lookup capabilities, and treats all remotes as "basic", non-service
* node connections (for command authenication purposes).
explicit LokiMQ(Logger logger = [](LogLevel, const char*, int, std::string) { })
: LokiMQ("", "", false, {},
[](const auto&) { return std::string{}; },
[](string_view, string_view) { return Allow{AuthLevel::basic}; },
std::move(logger)) {}
* Destructor; instructs the proxy to quit. The proxy tells all workers to quit, waits for them
* to quit and rejoins the threads then quits itself. The outer thread (where the destructor is
@ -630,6 +693,15 @@ public:
void add_command(const std::string& category, std::string name, CommandCallback callback);
* Adds a new "request" command to an existing category. These commands are just like normal
* commands, but are expected to call `msg.send_reply()` with any data parts on every request,
* while normal commands are more general.
* Parameters given here are identical to `add_command()`.
void add_request_command(const std::string& category, std::string name, CommandCallback callback);
* Adds a command alias; this is intended for temporary backwards compatibility: if any aliases
* are defined then every command (not just aliased ones) has to be checked on invocation to see
@ -648,8 +720,12 @@ public:
* Sets the number of worker threads reserved for batch jobs. If not called this defaults to
* half the number of hardware threads available (rounded up). This works exactly like reserved_threads
* for a category, but allows to batch jobs. See category for details.
* half the number of hardware threads available (rounded up). This works exactly like
* reserved_threads for a category, but allows to batch jobs. See category for details.
* Note that some internal jobs are counted as batch jobs: in particular timers added via
* add_timer() and replies received in response to request commands currently each take a batch
* job slot when invoked.
* Cannot be called after start()ing the LokiMQ instance.
@ -697,14 +773,54 @@ public:
* guarantee that the hint will be used; it is only usefully specified if the
* connection location has already been incidentally determined).
void connect(const std::string& pubkey, std::chrono::milliseconds keep_alive = 5min, const std::string& hint = "");
void connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive = 5min, string_view hint = {});
* Queue a message to be relayed to the SN identified with the given pubkey without expecting a
* reply. LokiMQ will attempt to relay the message (first connecting and handshaking if not
* already connected to the given SN).
* Establish a connection to the given remote with callbacks invoked on a successful or failed
* connection. The success callback gives you the pubkey of the remote, which can then be used
* to send commands to the remote (via `send()`). is generally intended for cases where the remote is
* being treated as the "server" and the local connection as a "client"; for connections between
* peers (i.e. between SNs) you generally want connect_sn() instead. If pubkey is non-empty
* then the remote must have that pubkey; if empty then any pubkey is allowed.
* If a new connection it established it will have a relatively short (30s) idle timeout. If
* Unlike `connect_sn`, the connection established here will be kept open
* indefinitely (until you call disconnect).
* The `on_connect` and `on_failure` callbacks are invoked when a connection has been
* established or failed to establish.
* @param remote the remote connection address, such as `tcp://localhost:1234`.
* @param on_connect called with the identifier and the remote's pubkey after the connection has
* been established and handshaked.
* @param on_failure called with a failure message if we fail to connect.
* @param pubkey the required remote pubkey (empty to accept any).
* @param timeout how long to try before aborting the connection attempt and calling the
* on_failure callback. Note that the connection can fail for various reasons before the
* timeout.
void connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
string_view pubkey = {}, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
* Disconnects an established outgoing connection established with `connect_remote()`.
* @param id the connection id, as returned by `connect_remote()`.
* @param linger how long to allow the connection to linger while there are still pending
* outbound messages to it before disconnecting and dropping any pending messages. (Note that
* this lingering is internal; the disconnect_remote() call does not block). The default is 1
* second.
void disconnect_remote(string_view id, std::chrono::milliseconds linger = 1s);
* Queue a message to be relayed to the node identified with the given identifier (for SNs and
* incoming connections this is a pubkey; for connections established with `connect()` this will
* be the opaque string returned by `connect()`), without expecting a reply. LokiMQ will
* attempt to relay the message (first connecting and handshaking if not already connected
* and the given pubkey is a service node's pubkey).
* If a new connection is established it will have a relatively short (30s) idle timeout. If
* the connection should stay open longer you should call `connect(pubkey, IDLETIME)` first.
* Note that this method (along with connect) doesn't block waiting for a connection or for the
@ -712,7 +828,7 @@ public:
* generally try hard to deliver it (reconnecting if the connection fails), but if the
* connection fails persistently the message will eventually be dropped.
* @param pubkey - the pubkey to send this to
* @param id - the pubkey or identifier returned by `connect()` to send this to
* @param cmd - the first data frame value which is almost always the remote "category.command" name
* @param opts - any number of std::string and send options. Each send option affects
* how the send works; each string becomes a serialized message part.
@ -728,19 +844,20 @@ public:
template <typename... T>
void send(const std::string& pubkey, const std::string& cmd, const T&... opts);
* Similar to the above, but takes an iterator pair of message parts to send after the value.
/** Send a command configured as a "REQUEST" command: the data parts will be prefixed with a
* random identifier. The remote is expected to reply with a ["REPLY", <identifier>, ...]
* message, at which point we invoke the given callback with any [...] parts of the reply.
* @param pubkey - the pubkey to send this to
* @param cmd - the value of the first message part (i.e. the remote command)
* @param first - an input iterator to std::string values
* @param last - the beyond-the-end iterator
* @param opts - any number of send options. This may also contain additional message strings
* which will be appended after the `[first, last)` message parts.
* @param pubkey - the pubkey to send this request to
* @param cmd - the command name
* @param callback - the callback to invoke when we get a reply. Called with a true value and
* the data strings when a reply is received, or false and an empty vector of data parts if we
* get no reply in the timeout interval.
* @param opts - anything else (i.e. strings, send_options) is forwarded to send().
template <typename InputIt, typename... T>
void send(const std::string& pubkey, const std::string& cmd, InputIt first, InputIt end, const T&... opts);
template <typename... T>
void request(const std::string& pubkey, const std::string& cmd, ReplyCallback callback,
const T&... opts);
/// The key pair this LokiMQ was created with; if empty keys were given during construction then
/// this returns the generated keys.
@ -760,27 +877,38 @@ public:
* submitting a single-job, no-completion batch.
void job(std::function<void()> f);
* Adds a timer that gets scheduled periodically in the job queue. Normally jobs are not
* double-booked: that is, a new timed job will not be scheduled if the timer fires before a
* previously scheduled callback of the job has not yet completed. If you want to override this
* (so that, under heavy load or long jobs, there can be more than one of the same job scheduled
* or running at a time) then specify `squelch` as `false`.
void add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch = true);
/// Namespace for options to the send() method
namespace send_option {
/// `serialized` lets you serialize once when sending the same data to many peers by constructing a
/// single serialized option and passing it repeatedly rather than needing to reserialize on each
/// send.
struct serialized {
std::string data;
template <typename T>
serialized(const T& arg) : data{lokimq::bt_serialize(arg)} {}
template <typename InputIt>
struct data_parts_impl {
InputIt begin, end;
data_parts_impl(InputIt begin, InputIt end) : begin{std::move(begin)}, end{std::move(end)} {}
/// Specifies an iterator pair of data options to send, for when the number of arguments to send()
/// cannot be determined at compile time.
template <typename InputIt>
data_parts_impl<InputIt> data_parts(InputIt begin, InputIt end) { return {std::move(begin), std::move(end)}; }
/// Specifies a connection hint when passed in to send(). If there is no current connection to the
/// peer then the hint is used to save a call to the SNRemoteAddress to get the connection location.
/// (Note that there is no guarantee that the given hint will be used or that a SNRemoteAddress call
/// will not also be done.)
struct hint {
std::string connect_hint;
hint(std::string connect_hint) : connect_hint{std::move(connect_hint)} {}
explicit hint(std::string connect_hint) : connect_hint{std::move(connect_hint)} {}
/// Does a send() if we already have a connection (incoming or outgoing) with the given peer,
@ -806,75 +934,92 @@ namespace detail {
// data (only sent if the data is non-empty).
void send_control(zmq::socket_t& sock, string_view cmd, std::string data = {});
/// Base case: takes a serializable value and appends it to the message parts
template <typename T>
void apply_send_option(bt_list& parts, bt_dict&, const T& arg) {
/// Base case: takes a string-like value and appends it to the message parts
inline void apply_send_option(bt_list& parts, bt_dict&, string_view arg) {
/// `serialized` specialization: lets you serialize once when sending the same data to many peers
template <> inline void apply_send_option(bt_list& parts, bt_dict& , const send_option::serialized& serialized) {
/// `data_parts` specialization: appends a range of serialized data parts to the parts to send
template <typename InputIt>
void apply_send_option(bt_list& parts, bt_dict&, const send_option::data_parts_impl<InputIt> data) {
for (auto it = data.begin; it != data.end; ++it)
/// `hint` specialization: sets the hint in the control data
template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::hint& hint) {
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::hint& hint) {
control_data["hint"] = hint.connect_hint;
/// `optional` specialization: sets the optional flag in the control data
template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::optional &) {
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::optional &) {
control_data["optional"] = 1;
/// `incoming` specialization: sets the optional flag in the control data
template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::incoming &) {
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::incoming &) {
control_data["incoming"] = 1;
/// `keep_alive` specialization: increases the outgoing socket idle timeout (if shorter)
template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::keep_alive& timeout) {
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::keep_alive& timeout) {
control_data["keep-alive"] = timeout.time.count();
/// Calls apply_send_option on each argument and returns a bt_dict with the command plus data stored
/// in the "send" key plus whatever else is implied by any given option arguments.
template <typename InputIt, typename... T>
bt_dict send_control_data(const std::string& cmd, InputIt begin, InputIt end, const T &...opts) {
} // namespace detail
template <typename... T>
void LokiMQ::send(const std::string& pubkey, const std::string& cmd, const T &...opts) {
bt_dict control_data;
bt_list parts{{cmd}};
parts.insert(parts.end(), std::move(begin), std::move(end));
#ifdef __cpp_fold_expressions
(detail::apply_send_option(parts, control_data, opts),...);
(void) std::initializer_list<int>{(detail::apply_send_option(parts, control_data, opts), 0)...};
control_data["send"] = std::move(parts);
return control_data;
} // namespace detail
template <typename InputIt, typename... T>
void LokiMQ::send(const std::string& pubkey, const std::string& cmd, InputIt first, InputIt last, const T &...opts) {
bt_dict control_data = detail::send_control_data(cmd, std::move(first), std::move(last), opts...);
control_data["pubkey"] = pubkey;
control_data["send"] = std::move(parts);
detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data));
std::string make_random_string(size_t size);
template <typename... T>
void LokiMQ::send(const std::string& pubkey, const std::string& cmd, const T &...opts) {
const std::string* no_it = nullptr;
send(pubkey, cmd, no_it, no_it, opts...);
void LokiMQ::request(const std::string& pubkey, const std::string& cmd, ReplyCallback callback, const T &...opts) {
auto reply_tag = make_random_string(15); // 15 should keep us in most stl implementations' small string optimization
bt_dict control_data;
bt_list parts{{cmd, reply_tag}};
#ifdef __cpp_fold_expressions
(detail::apply_send_option(parts, control_data, opts),...);
(void) std::initializer_list<int>{(detail::apply_send_option(parts, control_data, opts), 0)...};
control_data["pubkey"] = pubkey;
control_data["send"] = std::move(parts);
control_data["request"] = true;
control_data["request_callback"] = reinterpret_cast<uintptr_t>(new ReplyCallback{std::move(callback)});
control_data["request_tag"] = std::move(reply_tag);
detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data));
template <typename... Args>
void Message::reply(const std::string& command, Args&&... args) {
void Message::send_back(const std::string& command, Args&&... args) {
if (service_node) lokimq.send(pubkey, command, std::forward<Args>(args)...);
else lokimq.send(pubkey, command, send_option::optional{}, std::forward<Args>(args)...);
template <typename... Args>
void Message::send_reply(Args&&... args) {
if (service_node) lokimq.send(pubkey, "REPLY", reply_tag, std::forward<Args>(args)...);
else lokimq.send(pubkey, "REPLY", reply_tag, send_option::optional{}, std::forward<Args>(args)...);
template <typename... T>
void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) {
@ -890,16 +1035,7 @@ void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) {
logger(lvl, file, line, os.str());
std::ostream &operator<<(std::ostream &os, LogLevel lvl) {
os << (lvl == LogLevel::trace ? "trace" :
lvl == LogLevel::debug ? "debug" :
lvl == LogLevel::info ? "info" :
lvl == LogLevel::warn ? "warn" :
lvl == LogLevel::error ? "ERROR" :
lvl == LogLevel::fatal ? "FATAL" :
return os;
std::ostream &operator<<(std::ostream &os, LogLevel lvl);

View File

@ -4,6 +4,8 @@ add_subdirectory(Catch2)
add_executable(tests ${LMQ_TEST_SRC})

tests/test_commands.cpp Normal file
View File

@ -0,0 +1,64 @@
#include "lokimq/lokimq.h"
#include <future>
#include <catch2/catch.hpp>
using namespace lokimq;
TEST_CASE("basic commands", "[cmd-basic]") {
std::string listen = "tcp://";
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto &) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
std::atomic<int> hellos{0}, his{0};
server.add_category("public", Access{AuthLevel::none});
server.add_command("public", "hello", [&](Message& m) {
// On every 1st, 3rd, 5th, ... hello send back a hi
if (hellos++ % 2 == 0)
LokiMQ client(
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
client.add_category("public", Access{AuthLevel::none});
client.add_command("public", "hi", [&](auto&) { his++; });
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
[&](std::string pk) { pubkey = std::move(pk); connected = true; },
[&](string_view) { failed = true; },
for (int i = 0; i < 20; i++) {
if (connected.load())
REQUIRE( connected.load() );
REQUIRE( !failed.load() );
REQUIRE( pubkey == server.get_pubkey() );
client.send(pubkey, "public.hello");
REQUIRE( hellos == 1 );
REQUIRE( his == 1 );
for (int i = 0; i < 50; i++)
client.send(pubkey, "public.hello");
REQUIRE( hellos == 51 );
REQUIRE( his == 26 );

tests/test_requests.cpp Normal file
View File

@ -0,0 +1,63 @@
#include "lokimq/lokimq.h"
#include <future>
#include <catch2/catch.hpp>
using namespace lokimq;
TEST_CASE("basic requests", "[req-basic]") {
std::string listen = "tcp://";
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto &) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
std::atomic<int> hellos{0}, his{0};
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "hello", [&](Message& m) {
LokiMQ client(
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
[&](std::string pk) { pubkey = std::move(pk); connected = true; },
[&](string_view) { failed = true; },
for (int i = 0; i < 20; i++) {
if (connected.load())
REQUIRE( connected.load() );
REQUIRE( !failed.load() );
REQUIRE( pubkey == server.get_pubkey() );
std::atomic<bool> got_reply{false};
bool success;
std::vector<std::string> data;
client.request(pubkey, "public.hello", [&](bool ok, std::vector<std::string> data_) {
got_reply = true;
success = ok;
data = std::move(data_);
// FIXME: we shouldn't need to wait this long (perhaps explore zmq send immediate?)
REQUIRE( got_reply.load() );
REQUIRE( success );
REQUIRE( data == std::vector<std::string>{{"123"}} );