Merge pull request #70 from jagerman/internal-rebrand

Rebrand variables LMQ -> OMQ
This commit is contained in:
Jason Rhinelander 2021-11-30 14:14:30 -04:00 committed by GitHub
commit e382373f2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 262 additions and 264 deletions

View File

@ -110,8 +110,8 @@ dictionaries. See `oxenmq/bt_serialize.h` if you want a bt serializer/deseriali
Sending a command to a peer is done by using a connection ID, and generally falls into either a
`send()` method or a `request()` method.
lmq.send(conn, "category.command", "some data");
lmq.request(conn, "category.command", [](bool success, std::vector<std::string> data) {
omq.send(conn, "category.command", "some data");
omq.request(conn, "category.command", [](bool success, std::vector<std::string> data) {
if (success) { std::cout << "Remote replied: " << data.at(0) << "\n"; } });
The connection ID generally has two possible values:
@ -127,13 +127,13 @@ The connection ID generally has two possible values:
```C++
// Send to a service node, establishing a connection if necessary:
std::string my_sn = ...; // 32-byte pubkey of a known SN
lmq.send(my_sn, "sn.explode", "{ \"seconds\": 30 }");
omq.send(my_sn, "sn.explode", "{ \"seconds\": 30 }");
// Connect to a remote by address then send it something
auto conn = lmq.connect_remote("tcp://127.0.0.1:4567",
auto conn = omq.connect_remote("tcp://127.0.0.1:4567",
[](ConnectionID c) { std::cout << "Connected!\n"; },
[](ConnectionID c, string_view f) { std::cout << "Connect failed: " << f << "\n" });
lmq.request(conn, "rpc.get_height", [](bool s, std::vector<std::string> d) {
omq.request(conn, "rpc.get_height", [](bool s, std::vector<std::string> d) {
if (s && d.size() == 1)
std::cout << "Current height: " << d[0] << "\n";
else
@ -332,7 +332,7 @@ void start_big_task() {
batch.completion(&continue_big_task);
lmq.batch(std::move(batch));
omq.batch(std::move(batch));
// ... to be continued in `continue_big_task` after all the jobs finish
// Can do other things here, but note that continue_big_task could run
@ -347,7 +347,7 @@ going to help you hurt yourself like that.
### Single-job queuing
As a shortcut there is a `lmq.job(...)` method that schedules a single task (with no return value)
As a shortcut there is a `omq.job(...)` method that schedules a single task (with no return value)
in the batch job queue. This is useful when some event requires triggering some other event, but
you don't need to wait for or collect its result. (Internally this is just a convenience method
around creating a single-job, no-completion Batch job).

View File

@ -37,22 +37,22 @@ bool OxenMQ::proxy_check_auth(int64_t conn_id, bool outgoing, const peer_info& p
std::string reply;
if (!cat_call.first) {
LMQ_LOG(warn, "Invalid command '", command, "' sent by remote [", to_hex(peer.pubkey), "]/", peer_address(cmd));
OMQ_LOG(warn, "Invalid command '", command, "' sent by remote [", to_hex(peer.pubkey), "]/", peer_address(cmd));
reply = "UNKNOWNCOMMAND";
} else if (peer.auth_level < cat_call.first->access.auth) {
LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
OMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
": peer auth level ", peer.auth_level, " < ", cat_call.first->access.auth);
reply = "FORBIDDEN";
} else if (cat_call.first->access.local_sn && !local_service_node) {
LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
OMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
": that command is only available when this OxenMQ is running in service node mode");
reply = "NOT_A_SERVICE_NODE";
} else if (cat_call.first->access.remote_sn && !peer.service_node) {
LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
OMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
": remote is not recognized as a service node");
reply = "FORBIDDEN_SN";
} else if (cat_call.second->second /*is_request*/ && data.empty()) {
LMQ_LOG(warn, "Received an invalid request for '", command, "' with no reply tag from remote [",
OMQ_LOG(warn, "Received an invalid request for '", command, "' with no reply tag from remote [",
to_hex(peer.pubkey), "]/", peer_address(cmd));
reply = "NO_REPLY_TAG";
} else {
@ -75,7 +75,7 @@ bool OxenMQ::proxy_check_auth(int64_t conn_id, bool outgoing, const peer_info& p
send_message_parts(connections.at(conn_id), msgs);
} catch (const zmq::error_t& err) {
/* can't send: possibly already disconnected. Ignore. */
LMQ_LOG(debug, "Couldn't send auth failure message ", reply, " to peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": ", err.what());
OMQ_LOG(debug, "Couldn't send auth failure message ", reply, " to peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": ", err.what());
}
return false;
@ -97,7 +97,7 @@ void OxenMQ::proxy_set_active_sns(pubkey_set pubkeys) {
for (auto it = pubkeys.begin(); it != pubkeys.end(); ) {
auto& pk = *it;
if (pk.size() != 32) {
LMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to set_active_sns");
OMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to set_active_sns");
it = pubkeys.erase(it);
continue;
}
@ -106,7 +106,7 @@ void OxenMQ::proxy_set_active_sns(pubkey_set pubkeys) {
++it;
}
if (added.empty() && active_service_nodes.size() == pubkeys.size()) {
LMQ_LOG(debug, "set_active_sns(): new set of SNs is unchanged, skipping update");
OMQ_LOG(debug, "set_active_sns(): new set of SNs is unchanged, skipping update");
return;
}
for (const auto& pk : active_service_nodes) {
@ -141,7 +141,7 @@ void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) {
for (auto it = removed.begin(); it != removed.end(); ) {
const auto& pk = *it;
if (pk.size() != 32) {
LMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (removed)");
OMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (removed)");
it = removed.erase(it);
} else if (!active_service_nodes.count(pk) || added.count(pk) /* added wins if in both */) {
it = removed.erase(it);
@ -153,7 +153,7 @@ void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) {
for (auto it = added.begin(); it != added.end(); ) {
const auto& pk = *it;
if (pk.size() != 32) {
LMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (added)");
OMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (added)");
it = added.erase(it);
} else if (active_service_nodes.count(pk)) {
it = added.erase(it);
@ -166,7 +166,7 @@ void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) {
}
void OxenMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed) {
LMQ_LOG(debug, "Updating SN auth status with +", added.size(), "/-", removed.size(), " pubkeys");
OMQ_LOG(debug, "Updating SN auth status with +", added.size(), "/-", removed.size(), " pubkeys");
// For anything we remove we want close the connection to the SN (if outgoing), and remove the
// stored peer_info (incoming or outgoing).
@ -179,7 +179,7 @@ void OxenMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed)
auto conn_id = it->second.conn_id;
it = peers.erase(it);
if (outgoing) {
LMQ_LOG(debug, "Closing outgoing connection to ", c);
OMQ_LOG(debug, "Closing outgoing connection to ", c);
proxy_close_connection(conn_id, CLOSE_LINGER);
}
}
@ -207,7 +207,7 @@ void OxenMQ::process_zap_requests() {
log(LogLevel::trace, __FILE__, __LINE__, o.str());
} else
#endif
LMQ_LOG(debug, "Processing ZAP authentication request");
OMQ_LOG(debug, "Processing ZAP authentication request");
// https://rfc.zeromq.org/spec:27/ZAP/
//
@ -240,7 +240,7 @@ void OxenMQ::process_zap_requests() {
std::string &status_code = response_vals[2], &status_text = response_vals[3];
if (frames.size() < 6 || view(frames[0]) != "1.0") {
LMQ_LOG(error, "Bad ZAP authentication request: version != 1.0 or invalid ZAP message parts");
OMQ_LOG(error, "Bad ZAP authentication request: version != 1.0 or invalid ZAP message parts");
status_code = "500";
status_text = "Internal error: invalid auth request";
} else {
@ -251,18 +251,18 @@ void OxenMQ::process_zap_requests() {
} catch (...) {}
if (bind_id >= bind.size()) {
LMQ_LOG(error, "Bad ZAP authentication request: invalid auth domain '", auth_domain, "'");
OMQ_LOG(error, "Bad ZAP authentication request: invalid auth domain '", auth_domain, "'");
status_code = "400";
status_text = "Unknown authentication domain: " + std::string{auth_domain};
} else if (bind[bind_id].curve
? !(frames.size() == 7 && view(frames[5]) == "CURVE")
: !(frames.size() == 6 && view(frames[5]) == "NULL")) {
LMQ_LOG(error, "Bad ZAP authentication request: invalid ",
OMQ_LOG(error, "Bad ZAP authentication request: invalid ",
bind[bind_id].curve ? "CURVE" : "NULL", " authentication request");
status_code = "500";
status_text = "Invalid authentication request mechanism";
} else if (bind[bind_id].curve && frames[6].size() != 32) {
LMQ_LOG(error, "Bad ZAP authentication request: invalid request pubkey");
OMQ_LOG(error, "Bad ZAP authentication request: invalid request pubkey");
status_code = "500";
status_text = "Invalid public key size for CURVE authentication";
} else {
@ -281,14 +281,14 @@ void OxenMQ::process_zap_requests() {
}
if (auth <= AuthLevel::denied || auth > AuthLevel::admin) {
LMQ_LOG(info, "Access denied for incoming ", view(frames[5]), (sn ? " service node" : " client"),
OMQ_LOG(info, "Access denied for incoming ", view(frames[5]), (sn ? " service node" : " client"),
" connection from ", !user_id.empty() ? user_id + " at " : ""s, ip,
" with initial auth level ", auth);
status_code = "400";
status_text = "Access denied";
user_id.clear();
} else {
LMQ_LOG(debug, "Accepted incoming ", view(frames[5]), (sn ? " service node" : " client"),
OMQ_LOG(debug, "Accepted incoming ", view(frames[5]), (sn ? " service node" : " client"),
" connection with authentication level ", auth,
" from ", !user_id.empty() ? user_id + " at " : ""s, ip);
@ -301,7 +301,7 @@ void OxenMQ::process_zap_requests() {
}
}
LMQ_TRACE("ZAP request result: ", status_code, " ", status_text);
OMQ_TRACE("ZAP request result: ", status_code, " ", status_text);
std::vector<zmq::message_t> response;
response.reserve(response_vals.size());

View File

@ -119,10 +119,10 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
}
if (peer) {
LMQ_TRACE("proxy asked to connect to ", to_hex(remote), "; reusing existing connection");
OMQ_TRACE("proxy asked to connect to ", to_hex(remote), "; reusing existing connection");
if (peer->route.empty() /* == outgoing*/) {
if (peer->idle_expiry < keep_alive) {
LMQ_LOG(debug, "updating existing outgoing peer connection idle expiry time from ",
OMQ_LOG(debug, "updating existing outgoing peer connection idle expiry time from ",
peer->idle_expiry.count(), "ms to ", keep_alive.count(), "ms");
peer->idle_expiry = keep_alive;
}
@ -130,12 +130,12 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
}
return {&connections[peer->conn_id], peer->route};
} else if (optional || incoming_only) {
LMQ_LOG(debug, "proxy asked for optional or incoming connection, but no appropriate connection exists so aborting connection attempt");
OMQ_LOG(debug, "proxy asked for optional or incoming connection, but no appropriate connection exists so aborting connection attempt");
return {nullptr, ""s};
}
// No connection so establish a new one
LMQ_LOG(debug, "proxy establishing new outbound connection to ", to_hex(remote));
OMQ_LOG(debug, "proxy establishing new outbound connection to ", to_hex(remote));
std::string addr;
bool to_self = false && remote == pubkey; // FIXME; need to use a separate listening socket for this, otherwise we can't easily
// tell it wasn't from a remote.
@ -147,15 +147,15 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
if (addr.empty())
addr = sn_lookup(remote);
else
LMQ_LOG(debug, "using connection hint ", connect_hint);
OMQ_LOG(debug, "using connection hint ", connect_hint);
if (addr.empty()) {
LMQ_LOG(error, "peer lookup failed for ", to_hex(remote));
OMQ_LOG(error, "peer lookup failed for ", to_hex(remote));
return {nullptr, ""s};
}
}
LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote));
OMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote));
zmq::socket_t socket{context, zmq::socket_type::dealer};
setup_outgoing_socket(socket, remote, use_ephemeral_routing_id);
try {
@ -163,7 +163,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
} catch (const zmq::error_t& e) {
// Note that this failure cases indicates something serious went wrong that means zmq isn't
// even going to try connecting (for example an unparseable remote address).
LMQ_LOG(error, "Outgoing connection to ", addr, " failed: ", e.what());
OMQ_LOG(error, "Outgoing connection to ", addr, " failed: ", e.what());
return {nullptr, ""s};
}
@ -211,10 +211,10 @@ std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consume
void OxenMQ::proxy_close_connection(int64_t id, std::chrono::milliseconds linger) {
auto it = connections.find(id);
if (it == connections.end()) {
LMQ_LOG(warn, "internal error: connection to close (", id, ") doesn't exist!");
OMQ_LOG(warn, "internal error: connection to close (", id, ") doesn't exist!");
return;
}
LMQ_LOG(debug, "Closing conn ", id);
OMQ_LOG(debug, "Closing conn ", id);
it->second.set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0);
connections.erase(it);
connections_updated = true;
@ -228,13 +228,13 @@ void OxenMQ::proxy_expire_idle_peers() {
if (info.outgoing()) {
auto idle = std::chrono::steady_clock::now() - info.last_activity;
if (idle > info.idle_expiry) {
LMQ_LOG(debug, "Closing outgoing connection to ", it->first, ": idle time (",
OMQ_LOG(debug, "Closing outgoing connection to ", it->first, ": idle time (",
std::chrono::duration_cast<std::chrono::milliseconds>(idle).count(), "ms) reached connection timeout (",
info.idle_expiry.count(), "ms)");
proxy_close_connection(info.conn_id, CLOSE_LINGER);
it = peers.erase(it);
} else {
LMQ_LOG(trace, "Not closing ", it->first, ": ", std::chrono::duration_cast<std::chrono::milliseconds>(idle).count(),
OMQ_LOG(trace, "Not closing ", it->first, ": ", std::chrono::duration_cast<std::chrono::milliseconds>(idle).count(),
"ms <= ", info.idle_expiry.count(), "ms");
++it;
continue;
@ -246,17 +246,17 @@ void OxenMQ::proxy_expire_idle_peers() {
}
void OxenMQ::proxy_conn_cleanup() {
LMQ_TRACE("starting proxy connections cleanup");
OMQ_TRACE("starting proxy connections cleanup");
// Drop idle connections (if we haven't done it in a while)
LMQ_TRACE("closing idle connections");
OMQ_TRACE("closing idle connections");
proxy_expire_idle_peers();
auto now = std::chrono::steady_clock::now();
// FIXME - check other outgoing connections to see if they died and if so purge them
LMQ_TRACE("Timing out pending outgoing connections");
OMQ_TRACE("Timing out pending outgoing connections");
// Check any pending outgoing connections for timeout
for (auto it = pending_connects.begin(); it != pending_connects.end(); ) {
auto& pc = *it;
@ -270,12 +270,12 @@ void OxenMQ::proxy_conn_cleanup() {
}
}
LMQ_TRACE("Timing out pending requests");
OMQ_TRACE("Timing out pending requests");
// Remove any expired pending requests and schedule their callback with a failure
for (auto it = pending_requests.begin(); it != pending_requests.end(); ) {
auto& callback = it->second;
if (callback.first < now) {
LMQ_LOG(debug, "pending request ", to_hex(it->first), " expired, invoking callback with failure status and removing");
OMQ_LOG(debug, "pending request ", to_hex(it->first), " expired, invoking callback with failure status and removing");
job([callback = std::move(callback.second)] { callback(false, {{"TIMEOUT"s}}); });
it = pending_requests.erase(it);
} else {
@ -283,7 +283,7 @@ void OxenMQ::proxy_conn_cleanup() {
}
}
LMQ_TRACE("done proxy connections cleanup");
OMQ_TRACE("done proxy connections cleanup");
};
void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
@ -318,7 +318,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
if (conn_id == -1 || remote.empty())
throw std::runtime_error("Internal error: CONNECT_REMOTE proxy command missing required 'conn_id' and/or 'remote' value");
LMQ_LOG(debug, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + to_hex(remote_pubkey));
OMQ_LOG(debug, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + to_hex(remote_pubkey));
zmq::socket_t sock{context, zmq::socket_type::dealer};
try {
@ -333,7 +333,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(sock))->second;
connections_updated = true;
LMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI");
OMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI");
send_direct_message(s, "HI");
pending_connects.emplace_back(conn_id, std::chrono::steady_clock::now() + timeout,
std::move(on_connect), std::move(on_failure));
@ -363,18 +363,18 @@ void OxenMQ::proxy_disconnect(bt_dict_consumer data) {
proxy_disconnect(std::move(connid), linger);
}
void OxenMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger) {
LMQ_TRACE("Disconnecting outgoing connection to ", conn);
OMQ_TRACE("Disconnecting outgoing connection to ", conn);
auto pr = peers.equal_range(conn);
for (auto it = pr.first; it != pr.second; ++it) {
auto& peer = it->second;
if (peer.outgoing()) {
LMQ_LOG(debug, "Closing outgoing connection to ", conn);
OMQ_LOG(debug, "Closing outgoing connection to ", conn);
proxy_close_connection(peer.conn_id, linger);
peers.erase(it);
return;
}
}
LMQ_LOG(warn, "Failed to disconnect ", conn, ": no such outgoing connection");
OMQ_LOG(warn, "Failed to disconnect ", conn, ": no such outgoing connection");
}

View File

@ -7,7 +7,7 @@ namespace oxenmq {
void OxenMQ::proxy_batch(detail::Batch* batch) {
batches.insert(batch);
const auto [jobs, tagged_threads] = batch->size();
LMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
if (!tagged_threads) {
for (size_t i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
@ -82,19 +82,19 @@ void OxenMQ::proxy_timer(bt_list_consumer timer_data) {
void OxenMQ::_queue_timer_job(int timer_id) {
auto it = timer_jobs.find(timer_id);
if (it == timer_jobs.end()) {
LMQ_LOG(warn, "Could not find timer job ", timer_id);
OMQ_LOG(warn, "Could not find timer job ", timer_id);
return;
}
auto& [func, squelch, running, thread] = it->second;
if (squelch && running) {
LMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running");
OMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running");
return;
}
if (thread == -1) { // Run directly in proxy thread
try { func(); }
catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
return;
}
@ -104,15 +104,15 @@ void OxenMQ::_queue_timer_job(int timer_id) {
running = true;
b->completion([this,timer_id](auto results) {
try { results[0].get(); }
catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
auto it = timer_jobs.find(timer_id);
if (it != timer_jobs.end())
it->second.running = false;
}, OxenMQ::run_in_proxy);
}
batches.insert(b);
LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
assert(b->size() == std::make_pair(size_t{1}, thread > 0));
auto& queue = thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1])
@ -172,7 +172,7 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function<void()>
busy = false;
run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs)
run.worker_routing_id = "t" + std::to_string(run.worker_id);
LMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)};

View File

@ -19,8 +19,8 @@ public:
std::string remote; ///< Some sort of remote address from which the request came. Often "IP" for TCP connections and "localhost:UID:GID:PID" for unix socket connections.
/// Constructor
Message(OxenMQ& lmq, ConnectionID cid, Access access, std::string remote)
: oxenmq{lmq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {}
Message(OxenMQ& omq, ConnectionID cid, Access access, std::string remote)
: oxenmq{omq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {}
// Non-copyable
Message(const Message&) = delete;
@ -49,7 +49,7 @@ public:
void send_reply(Args&&... args);
/// Sends a request back to whomever sent this message. This is effectively a wrapper around
/// lmq.request() that takes care of setting up the recipient arguments.
/// omq.request() that takes care of setting up the recipient arguments.
template <typename ReplyCallback, typename... Args>
void send_request(std::string_view command, ReplyCallback&& callback, Args&&... args);

View File

@ -2,15 +2,15 @@
#include "oxenmq.h"
// Inside some method:
// LMQ_LOG(warn, "bad ", 42, " stuff");
// OMQ_LOG(warn, "bad ", 42, " stuff");
//
#define LMQ_LOG(level, ...) log(LogLevel::level, __FILE__, __LINE__, __VA_ARGS__)
#define OMQ_LOG(level, ...) log(LogLevel::level, __FILE__, __LINE__, __VA_ARGS__)
#ifndef NDEBUG
// Same as LMQ_LOG(trace, ...) when not doing a release build; nothing under a release build.
# define LMQ_TRACE(...) log(LogLevel::trace, __FILE__, __LINE__, __VA_ARGS__)
// Same as OMQ_LOG(trace, ...) when not doing a release build; nothing under a release build.
# define OMQ_TRACE(...) log(LogLevel::trace, __FILE__, __LINE__, __VA_ARGS__)
#else
# define LMQ_TRACE(...)
# define OMQ_TRACE(...)
#endif
namespace oxenmq {

View File

@ -199,7 +199,7 @@ OxenMQ::OxenMQ(
sn_lookup{std::move(lookup)}, log_lvl{level}, logger{std::move(logger)}
{
LMQ_TRACE("Constructing OxenMQ, id=", object_id, ", this=", this);
OMQ_TRACE("Constructing OxenMQ, id=", object_id, ", this=", this);
if (sodium_init() == -1)
throw std::runtime_error{"libsodium initialization failed"};
@ -209,7 +209,7 @@ OxenMQ::OxenMQ(
} else if (pubkey.empty()) {
if (service_node)
throw std::invalid_argument("Cannot construct a service node mode OxenMQ without a keypair");
LMQ_LOG(debug, "generating x25519 keypair for remote-only OxenMQ instance");
OMQ_LOG(debug, "generating x25519 keypair for remote-only OxenMQ instance");
pubkey.resize(crypto_box_PUBLICKEYBYTES);
privkey.resize(crypto_box_SECRETKEYBYTES);
crypto_box_keypair(reinterpret_cast<unsigned char*>(&pubkey[0]), reinterpret_cast<unsigned char*>(&privkey[0]));
@ -232,13 +232,13 @@ void OxenMQ::start() {
if (proxy_thread.joinable())
throw std::logic_error("Cannot call start() multiple times!");
LMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey));
OMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey));
int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit);
if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit)
context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS);
else
LMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]");
OMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]");
// We bind `command` here so that the `get_control_socket()` below is always connecting to a
// bound socket, but we do nothing else here: the proxy thread is responsible for everything
@ -246,10 +246,10 @@ void OxenMQ::start() {
command.bind(SN_ADDR_COMMAND);
proxy_thread = std::thread{&OxenMQ::proxy_loop, this};
LMQ_LOG(debug, "Waiting for proxy thread to get ready...");
OMQ_LOG(debug, "Waiting for proxy thread to get ready...");
auto &control = get_control_socket();
detail::send_control(control, "START");
LMQ_TRACE("Sent START command");
OMQ_TRACE("Sent START command");
zmq::message_t ready_msg;
std::vector<zmq::message_t> parts;
@ -258,7 +258,7 @@ void OxenMQ::start() {
if (!(parts.size() == 1 && view(parts.front()) == "READY"))
throw std::runtime_error("Invalid startup message from proxy thread (didn't get expected READY message)");
LMQ_LOG(debug, "Proxy thread is ready");
OMQ_LOG(debug, "Proxy thread is ready");
}
void OxenMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection, std::function<void(bool)> on_bind) {
@ -286,7 +286,7 @@ void OxenMQ::listen_plain(std::string bind_addr, AllowFunc allow_connection, std
std::pair<OxenMQ::category*, const std::pair<OxenMQ::CommandCallback, bool>*> OxenMQ::get_command(std::string& command) {
if (command.size() > MAX_CATEGORY_LENGTH + 1 + MAX_COMMAND_LENGTH) {
LMQ_LOG(warn, "Invalid command '", command, "': command too long");
OMQ_LOG(warn, "Invalid command '", command, "': command too long");
return {};
}
@ -298,7 +298,7 @@ std::pair<OxenMQ::category*, const std::pair<OxenMQ::CommandCallback, bool>*> Ox
auto dot = command.find('.');
if (dot == 0 || dot == std::string::npos) {
LMQ_LOG(warn, "Invalid command '", command, "': expected <category>.<command>");
OMQ_LOG(warn, "Invalid command '", command, "': expected <category>.<command>");
return {};
}
std::string catname = command.substr(0, dot);
@ -306,14 +306,14 @@ std::pair<OxenMQ::category*, const std::pair<OxenMQ::CommandCallback, bool>*> Ox
auto catit = categories.find(catname);
if (catit == categories.end()) {
LMQ_LOG(warn, "Invalid command category '", catname, "'");
OMQ_LOG(warn, "Invalid command category '", catname, "'");
return {};
}
const auto& category = catit->second;
auto callback_it = category.commands.find(cmd);
if (callback_it == category.commands.end()) {
LMQ_LOG(warn, "Invalid command '", command, "'");
OMQ_LOG(warn, "Invalid command '", command, "'");
return {};
}
@ -416,10 +416,10 @@ OxenMQ::~OxenMQ() {
return;
}
LMQ_LOG(info, "OxenMQ shutting down proxy thread");
OMQ_LOG(info, "OxenMQ shutting down proxy thread");
detail::send_control(get_control_socket(), "QUIT");
proxy_thread.join();
LMQ_LOG(info, "OxenMQ proxy thread has stopped");
OMQ_LOG(info, "OxenMQ proxy thread has stopped");
}
std::ostream &operator<<(std::ostream &os, LogLevel lvl) {

View File

@ -680,7 +680,7 @@ private:
Access access;
std::string remote;
// Normal ctor for an actual lmq command being processed
// Normal ctor for an actual omq command being processed
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts,
const std::pair<CommandCallback, bool>* callback, ConnectionID conn, Access access, std::string remote)
: cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)},
@ -963,7 +963,7 @@ public:
* another `set_active_sns()` or a `update_active_sns()` call). It *is* possible to make the
* initial call after calling `start()`, but that creates a window during which incoming
* remote SN connections will be erroneously treated as non-SN connections.
* - If this LMQ instance should accept incoming connections, set up any listening ports via
* - If this OMQ instance should accept incoming connections, set up any listening ports via
* `listen_curve()` and/or `listen_plain()`.
*/
void start();
@ -1147,13 +1147,13 @@ public:
* Example:
*
* // Send to a SN, connecting to it if we aren't already connected:
* lmq.send(pubkey, "hello.world", "abc", send_option::hint("tcp://localhost:1234"), "def");
* omq.send(pubkey, "hello.world", "abc", send_option::hint("tcp://localhost:1234"), "def");
*
* // Start connecting to a remote and immediately queue a message for it
* auto conn = lmq.connect_remote("tcp://127.0.0.1:1234",
* auto conn = omq.connect_remote("tcp://127.0.0.1:1234",
* [](ConnectionID) { std::cout << "connected\n"; },
* [](ConnectionID, string_view why) { std::cout << "connection failed: " << why << \n"; });
* lmq.send(conn, "hello.world", "abc", "def");
* omq.send(conn, "hello.world", "abc", "def");
*
* Both of these send the command `hello.world` to the given pubkey, containing additional
* message parts "abc" and "def". In the first case, if not currently connected, the given
@ -1204,7 +1204,7 @@ public:
* @param category - the category name that should handle the request for the purposes of
* scheduling the job. The category must have been added using add_category(). The category
* can be an actual category with added commands, in which case the injected tasks are queued
* along with LMQ requests for that category, or can have no commands to set up a distinct
* along with OMQ requests for that category, or can have no commands to set up a distinct
* category for the injected jobs.
*
* @param command - a command name; this is mainly used for debugging and does not need to
@ -1326,32 +1326,32 @@ public:
///
/// This allows simplifying:
///
/// lmq.add_category("foo", ...);
/// lmq.add_command("foo", "a", ...);
/// lmq.add_command("foo", "b", ...);
/// lmq.add_request_command("foo", "c", ...);
/// omq.add_category("foo", ...);
/// omq.add_command("foo", "a", ...);
/// omq.add_command("foo", "b", ...);
/// omq.add_request_command("foo", "c", ...);
///
/// to:
///
/// lmq.add_category("foo", ...)
/// omq.add_category("foo", ...)
/// .add_command("a", ...)
/// .add_command("b", ...)
/// .add_request_command("b", ...)
/// ;
class CatHelper {
OxenMQ& lmq;
OxenMQ& omq;
std::string cat;
public:
CatHelper(OxenMQ& lmq, std::string cat) : lmq{lmq}, cat{std::move(cat)} {}
CatHelper(OxenMQ& omq, std::string cat) : omq{omq}, cat{std::move(cat)} {}
CatHelper& add_command(std::string name, OxenMQ::CommandCallback callback) {
lmq.add_command(cat, std::move(name), std::move(callback));
omq.add_command(cat, std::move(name), std::move(callback));
return *this;
}
CatHelper& add_request_command(std::string name, OxenMQ::CommandCallback callback) {
lmq.add_request_command(cat, std::move(name), std::move(callback));
omq.add_request_command(cat, std::move(name), std::move(callback));
return *this;
}
};

View File

@ -20,7 +20,7 @@ extern "C" {
namespace oxenmq {
void OxenMQ::proxy_quit() {
LMQ_LOG(debug, "Received quit command, shutting down proxy thread");
OMQ_LOG(debug, "Received quit command, shutting down proxy thread");
assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); }));
assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); }));
@ -38,7 +38,7 @@ void OxenMQ::proxy_quit() {
connections.clear();
peers.clear();
LMQ_LOG(debug, "Proxy thread teardown complete");
OMQ_LOG(debug, "Proxy thread teardown complete");
}
void OxenMQ::proxy_send(bt_dict_consumer data) {
@ -120,10 +120,10 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
if (!sock_route.first) {
nowarn = true;
if (optional)
LMQ_LOG(debug, "Not sending: send is optional and no connection to ",
OMQ_LOG(debug, "Not sending: send is optional and no connection to ",
to_hex(conn_id.pk), " is currently established");
else
LMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no valid connection address found");
OMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no valid connection address found");
break;
}
send_to = sock_route.first;
@ -131,20 +131,20 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
} else if (!conn_id.route.empty()) { // incoming non-SN connection
auto it = connections.find(conn_id.id);
if (it == connections.end()) {
LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found");
OMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found");
break;
}
send_to = &it->second;
} else {
auto pr = peers.equal_range(conn_id);
if (pr.first == peers.end()) {
LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
OMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
break;
}
auto& peer = pr.first->second;
auto it = connections.find(peer.conn_id);
if (it == connections.end()) {
LMQ_LOG(warn, "Unable to send: peer connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
OMQ_LOG(warn, "Unable to send: peer connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
break;
}
send_to = &it->second;
@ -155,7 +155,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
} catch (const zmq::error_t &e) {
if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) {
LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details");
OMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details");
auto pr = peers.equal_range(conn_id);
if (pr.first != peers.end()) {
@ -174,7 +174,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
// The incoming connection to the SN is no longer good, but we can retry because
// we may have another active connection with the SN (or may want to open one).
if (removed) {
LMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets");
OMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets");
retry = true;
}
}
@ -182,10 +182,10 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
}
if (!retry) {
if (!conn_id.sn() && !conn_id.route.empty()) { // incoming non-SN connection
LMQ_LOG(debug, "Unable to send message to incoming connection ", conn_id, ": ", e.what(),
OMQ_LOG(debug, "Unable to send message to incoming connection ", conn_id, ": ", e.what(),
"; remote has probably disconnected");
} else {
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
OMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
}
nowarn = true;
if (callback_nosend) {
@ -197,11 +197,11 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
}
if (request) {
if (sent) {
LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag));
OMQ_LOG(debug, "Added new pending request ", to_hex(request_tag));
pending_requests.insert({ request_tag, {
std::chrono::steady_clock::now() + request_timeout, std::move(request_callback) }});
} else {
LMQ_LOG(debug, "Could not send request, scheduling request callback failure");
OMQ_LOG(debug, "Could not send request, scheduling request callback failure");
job([callback = std::move(request_callback)] { callback(false, {{"TIMEOUT"s}}); });
}
}
@ -211,7 +211,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
else if (callback_noqueue)
job(std::move(callback_noqueue));
else if (!nowarn)
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": sending would block");
OMQ_LOG(warn, "Unable to send message to ", conn_id, ": sending would block");
}
}
@ -238,7 +238,7 @@ void OxenMQ::proxy_reply(bt_dict_consumer data) {
auto pr = peers.equal_range(conn_id);
if (pr.first == pr.second) {
LMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid");
OMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid");
return;
}
@ -251,18 +251,18 @@ void OxenMQ::proxy_reply(bt_dict_consumer data) {
} catch (const zmq::error_t &err) {
if (err.num() == EHOSTUNREACH) {
if (it->second.outgoing()) {
LMQ_LOG(debug, "Unable to send reply to non-SN request on outgoing socket: "
OMQ_LOG(debug, "Unable to send reply to non-SN request on outgoing socket: "
"remote is no longer connected; closing connection");
proxy_close_connection(it->second.conn_id, CLOSE_LINGER);
it = peers.erase(it);
++it;
} else {
LMQ_LOG(debug, "Unable to send reply to non-SN request on incoming socket: "
OMQ_LOG(debug, "Unable to send reply to non-SN request on incoming socket: "
"remote is no longer connected; removing peer details");
it = peers.erase(it);
}
} else {
LMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what());
OMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what());
++it;
}
}
@ -275,22 +275,22 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
if (parts.size() < 2)
throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message");
auto route = view(parts[0]), cmd = view(parts[1]);
LMQ_TRACE("control message: ", cmd);
OMQ_TRACE("control message: ", cmd);
if (parts.size() == 3) {
LMQ_TRACE("...: ", parts[2]);
OMQ_TRACE("...: ", parts[2]);
auto data = view(parts[2]);
if (cmd == "SEND") {
LMQ_TRACE("proxying message");
OMQ_TRACE("proxying message");
return proxy_send(data);
} else if (cmd == "REPLY") {
LMQ_TRACE("proxying reply to non-SN incoming message");
OMQ_TRACE("proxying reply to non-SN incoming message");
return proxy_reply(data);
} else if (cmd == "BATCH") {
LMQ_TRACE("proxy batch jobs");
OMQ_TRACE("proxy batch jobs");
auto ptrval = bt_deserialize<uintptr_t>(data);
return proxy_batch(reinterpret_cast<detail::Batch*>(ptrval));
} else if (cmd == "INJECT") {
LMQ_TRACE("proxy inject");
OMQ_TRACE("proxy inject");
return proxy_inject_task(detail::deserialize_object<injected_task>(bt_deserialize<uintptr_t>(data)));
} else if (cmd == "SET_SNS") {
return proxy_set_active_sns(data);
@ -351,11 +351,11 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
b.on_bind = nullptr;
}
if (!good) {
LMQ_LOG(warn, "OxenMQ failed to listen on ", b.address);
OMQ_LOG(warn, "OxenMQ failed to listen on ", b.address);
return false;
}
LMQ_LOG(info, "OxenMQ listening on ", b.address);
OMQ_LOG(info, "OxenMQ listening on ", b.address);
b.conn_id = next_conn_id++;
connections.emplace_hint(connections.end(), b.conn_id, std::move(listener));
@ -368,11 +368,11 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
void OxenMQ::proxy_loop() {
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
pthread_setname_np(pthread_self(), "lmq-proxy");
pthread_setname_np(pthread_self(), "omq-proxy");
#elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
pthread_set_name_np(pthread_self(), "lmq-proxy");
pthread_set_name_np(pthread_self(), "omq-proxy");
#elif defined(__MACH__)
pthread_setname_np("lmq-proxy");
pthread_setname_np("omq-proxy");
#endif
zap_auth.set(zmq::sockopt::linger, 0);
@ -393,12 +393,12 @@ void OxenMQ::proxy_loop() {
}
if (log_level() >= LogLevel::debug) {
LMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:");
for (const auto& cat : categories)
LMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
LMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
LMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
LMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads);
OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved);
OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved);
OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads");
}
workers.reserve(max_workers);
@ -420,7 +420,7 @@ void OxenMQ::proxy_loop() {
for (size_t i = 0; i < bind.size(); i++) {
if (!proxy_bind(bind[i], i)) {
LMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address);
OMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address);
throw zmq::error_t{};
}
}
@ -467,25 +467,25 @@ void OxenMQ::proxy_loop() {
// and send them back a "START" to let them know to go ahead with startup. We need this
// synchronization dance to guarantee that the workers are routable before we can proceed.
if (!tagged_workers.empty()) {
LMQ_LOG(debug, "Waiting for tagged workers");
OMQ_LOG(debug, "Waiting for tagged workers");
std::unordered_set<std::string_view> waiting_on;
for (auto& w : tagged_workers)
waiting_on.emplace(std::get<run_info>(w).worker_routing_id);
for (; !waiting_on.empty(); parts.clear()) {
recv_message_parts(workers_socket, parts);
if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) {
LMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup");
OMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup");
continue;
}
LMQ_LOG(debug, "Received STARTING message from ", view(parts[0]));
OMQ_LOG(debug, "Received STARTING message from ", view(parts[0]));
if (auto it = waiting_on.find(view(parts[0])); it != waiting_on.end())
waiting_on.erase(it);
else
LMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0]));
OMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0]));
}
for (auto&w : tagged_workers) {
LMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
}
}
@ -509,7 +509,7 @@ void OxenMQ::proxy_loop() {
if (proxy_skip_one_poll)
proxy_skip_one_poll = false;
else {
LMQ_TRACE("polling for new messages");
OMQ_TRACE("polling for new messages");
// We poll the control socket and worker socket for any incoming messages. If we have
// available worker room then also poll incoming connections and outgoing connections
@ -518,30 +518,30 @@ void OxenMQ::proxy_loop() {
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
LMQ_TRACE("processing control messages");
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) {
proxy_control_message(parts);
}
LMQ_TRACE("processing worker messages");
OMQ_TRACE("processing worker messages");
for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) {
proxy_worker_message(parts);
}
LMQ_TRACE("processing timers");
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
// Handle any zap authentication
LMQ_TRACE("processing zap requests");
OMQ_TRACE("processing zap requests");
process_zap_requests();
// See if we can drain anything from the current queue before we potentially add to it
// below.
LMQ_TRACE("processing queued jobs and messages");
OMQ_TRACE("processing queued jobs and messages");
proxy_process_queue();
LMQ_TRACE("processing new incoming messages");
OMQ_TRACE("processing new incoming messages");
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
@ -566,7 +566,7 @@ void OxenMQ::proxy_loop() {
++end %= queue.size();
if (parts.empty()) {
LMQ_LOG(warn, "Ignoring empty (0-part) incoming message");
OMQ_LOG(warn, "Ignoring empty (0-part) incoming message");
continue;
}
@ -576,12 +576,12 @@ void OxenMQ::proxy_loop() {
if (connections_updated) {
// If connections got updated then our points are stale, to restart the proxy loop;
// if there are still messages waiting we'll end up right back here.
LMQ_TRACE("connections became stale; short-circuiting incoming message loop");
OMQ_TRACE("connections became stale; short-circuiting incoming message loop");
break;
}
}
LMQ_TRACE("done proxy loop");
OMQ_TRACE("done proxy loop");
}
}
@ -597,7 +597,7 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
std::string_view route, cmd;
if (parts.size() < 1 + incoming) {
LMQ_LOG(warn, "Received empty message; ignoring");
OMQ_LOG(warn, "Received empty message; ignoring");
return true;
}
if (incoming) {
@ -606,18 +606,18 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
} else {
cmd = view(parts[0]);
}
LMQ_TRACE("Checking for builtins: '", cmd, "' from ", peer_address(parts.back()));
OMQ_TRACE("Checking for builtins: '", cmd, "' from ", peer_address(parts.back()));
if (cmd == "REPLY") {
size_t tag_pos = 1 + incoming;
if (parts.size() <= tag_pos) {
LMQ_LOG(warn, "Received REPLY without a reply tag; ignoring");
OMQ_LOG(warn, "Received REPLY without a reply tag; ignoring");
return true;
}
std::string reply_tag{view(parts[tag_pos])};
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
LMQ_LOG(debug, "Received REPLY for pending command ", to_hex(reply_tag), "; scheduling callback");
OMQ_LOG(debug, "Received REPLY for pending command ", to_hex(reply_tag), "; scheduling callback");
std::vector<std::string> data;
data.reserve(parts.size() - (tag_pos + 1));
for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it)
@ -627,38 +627,38 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
});
pending_requests.erase(it);
} else {
LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
OMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
}
return true;
} else if (cmd == "HI") {
if (!incoming) {
LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring");
OMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring");
return true;
}
LMQ_LOG(debug, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO");
OMQ_LOG(debug, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO");
try {
send_routed_message(sock, std::string{route}, "HELLO");
} catch (const std::exception &e) { LMQ_LOG(warn, "Couldn't reply with HELLO: ", e.what()); }
} catch (const std::exception &e) { OMQ_LOG(warn, "Couldn't reply with HELLO: ", e.what()); }
return true;
} else if (cmd == "HELLO") {
if (incoming) {
LMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring");
OMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring");
return true;
}
auto it = std::find_if(pending_connects.begin(), pending_connects.end(),
[&](auto& pc) { return std::get<int64_t>(pc) == conn_id; });
if (it == pending_connects.end()) {
LMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring");
OMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring");
return true;
}
auto& pc = *it;
auto pit = peers.find(std::get<int64_t>(pc));
if (pit == peers.end()) {
LMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring");
OMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring");
return true;
}
LMQ_LOG(debug, "Got initial HELLO server response from ", peer_address(parts.back()));
OMQ_LOG(debug, "Got initial HELLO server response from ", peer_address(parts.back()));
proxy_schedule_reply_job([on_success=std::move(std::get<ConnectSuccess>(pc)),
conn=pit->first] {
on_success(conn);
@ -667,10 +667,10 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
return true;
} else if (cmd == "BYE") {
if (!incoming) {
LMQ_LOG(debug, "BYE command received; disconnecting from ", peer_address(parts.back()));
OMQ_LOG(debug, "BYE command received; disconnecting from ", peer_address(parts.back()));
proxy_close_connection(conn_id, 0s);
} else {
LMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring");
OMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring");
}
return true;
@ -688,7 +688,7 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
// pre-1.1.0 sent just a plain UNKNOWNCOMMAND (without the actual command); this was not
// useful, but also this response is *expected* for things 1.0.5 didn't understand, like
// FORBIDDEN_SN: so log it only at debug level and move on.
LMQ_LOG(debug, "Received plain UNKNOWNCOMMAND; remote is probably an older oxenmq. Ignoring.");
OMQ_LOG(debug, "Received plain UNKNOWNCOMMAND; remote is probably an older oxenmq. Ignoring.");
return true;
}
@ -696,16 +696,16 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec
std::string reply_tag{view(parts[2 + incoming])};
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
LMQ_LOG(debug, "Received ", cmd, " REPLY for pending command ", to_hex(reply_tag), "; scheduling failure callback");
OMQ_LOG(debug, "Received ", cmd, " REPLY for pending command ", to_hex(reply_tag), "; scheduling failure callback");
proxy_schedule_reply_job([callback=std::move(it->second.second), cmd=std::string{cmd}] {
callback(false, {{std::move(cmd)}});
});
pending_requests.erase(it);
} else {
LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
OMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring");
}
} else {
LMQ_LOG(warn, "Received ", cmd, ':', (parts.size() > 1 + incoming ? view(parts[1 + incoming]) : "(unknown command)"sv),
OMQ_LOG(warn, "Received ", cmd, ':', (parts.size() > 1 + incoming ? view(parts[1 + incoming]) : "(unknown command)"sv),
" from ", peer_address(parts.back()));
}
return true;

View File

@ -17,29 +17,29 @@ namespace {
// received. If "QUIT" was received, replies with "QUITTING" on the socket and closes it, then
// returns false.
[[gnu::always_inline]] inline
bool worker_wait_for(OxenMQ& lmq, zmq::socket_t& sock, std::vector<zmq::message_t>& parts, const std::string_view worker_id, const std::string_view expect) {
bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector<zmq::message_t>& parts, const std::string_view worker_id, const std::string_view expect) {
while (true) {
lmq.log(LogLevel::trace, __FILE__, __LINE__, "worker ", worker_id, " waiting for ", expect);
omq.log(LogLevel::trace, __FILE__, __LINE__, "worker ", worker_id, " waiting for ", expect);
parts.clear();
recv_message_parts(sock, parts);
if (parts.size() != 1) {
lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part control msg");
omq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part control msg");
continue;
}
auto command = view(parts[0]);
if (command == expect) {
#ifndef NDEBUG
lmq.log(LogLevel::trace, __FILE__, __LINE__, "Worker ", worker_id, " received waited-for ", expect, " command");
omq.log(LogLevel::trace, __FILE__, __LINE__, "Worker ", worker_id, " received waited-for ", expect, " command");
#endif
return true;
} else if (command == "QUIT"sv) {
lmq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down");
omq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down");
detail::send_control(sock, "QUITTING");
sock.set(zmq::sockopt::linger, 1000);
sock.close();
return false;
} else {
lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
omq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
}
}
}
@ -50,7 +50,7 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
[[maybe_unused]] std::string thread_name = tagged.value_or("lmq-" + routing_id);
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id);
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
if (thread_name.size() > 15) thread_name.resize(15);
pthread_setname_np(pthread_self(), thread_name.c_str());
@ -62,7 +62,7 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
zmq::socket_t sock{context, zmq::socket_type::dealer};
sock.set(zmq::sockopt::routing_id, routing_id);
LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
OMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
sock.connect(SN_ADDR_WORKERS);
if (tagged)
detail::send_control(sock, "STARTING");
@ -101,15 +101,15 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
if (run.is_batch_job) {
auto* batch = var::get<detail::Batch*>(run.to_run);
if (run.batch_jobno >= 0) {
LMQ_TRACE("worker thread ", worker_id, " running batch ", batch, "#", run.batch_jobno);
OMQ_TRACE("worker thread ", worker_id, " running batch ", batch, "#", run.batch_jobno);
batch->run_job(run.batch_jobno);
} else if (run.batch_jobno == -1) {
LMQ_TRACE("worker thread ", worker_id, " running batch ", batch, " completion");
OMQ_TRACE("worker thread ", worker_id, " running batch ", batch, " completion");
batch->job_completion();
}
} else if (run.is_injected) {
auto& func = var::get<std::function<void()>>(run.to_run);
LMQ_TRACE("worker thread ", worker_id, " invoking injected command ", run.command);
OMQ_TRACE("worker thread ", worker_id, " invoking injected command ", run.command);
func();
func = nullptr;
} else {
@ -118,7 +118,7 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
message.remote = std::move(run.remote);
message.data.clear();
LMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)");
OMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)");
auto& [callback, is_request] = *var::get<const std::pair<CommandCallback, bool>*>(run.to_run);
if (is_request) {
@ -130,26 +130,26 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
message.data.emplace_back(m.data<char>(), m.size());
}
LMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts");
OMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts");
callback(message);
}
}
catch (const bt_deserialize_invalid& e) {
LMQ_LOG(warn, worker_id, " deserialization failed: ", e.what(), "; ignoring request");
OMQ_LOG(warn, worker_id, " deserialization failed: ", e.what(), "; ignoring request");
}
#ifndef BROKEN_APPLE_VARIANT
catch (const std::bad_variant_access& e) {
LMQ_LOG(warn, worker_id, " deserialization failed: found unexpected serialized type (", e.what(), "); ignoring request");
OMQ_LOG(warn, worker_id, " deserialization failed: found unexpected serialized type (", e.what(), "); ignoring request");
}
#endif
catch (const std::out_of_range& e) {
LMQ_LOG(warn, worker_id, " deserialization failed: invalid data - required field missing (", e.what(), "); ignoring request");
OMQ_LOG(warn, worker_id, " deserialization failed: invalid data - required field missing (", e.what(), "); ignoring request");
}
catch (const std::exception& e) {
LMQ_LOG(warn, worker_id, " caught exception when processing command: ", e.what());
OMQ_LOG(warn, worker_id, " caught exception when processing command: ", e.what());
}
catch (...) {
LMQ_LOG(warn, worker_id, " caught non-standard exception when processing command");
OMQ_LOG(warn, worker_id, " caught non-standard exception when processing command");
}
// Tell the proxy thread that we are ready for another job
@ -177,11 +177,11 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() {
void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
// Process messages sent by workers
if (parts.size() != 2) {
LMQ_LOG(error, "Received send invalid ", parts.size(), "-part message");
OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message");
return;
}
auto route = view(parts[0]), cmd = view(parts[1]);
LMQ_TRACE("worker message from ", route);
OMQ_TRACE("worker message from ", route);
assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9');
bool tagged_worker = route[0] == 't';
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t")
@ -192,15 +192,15 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
: worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1
)
) {
LMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
return;
}
auto& run = tagged_worker ? std::get<run_info>(tagged_workers[worker_id - 1]) : workers[worker_id];
LMQ_TRACE("received ", cmd, " command from ", route);
OMQ_TRACE("received ", cmd, " command from ", route);
if (cmd == "RAN"sv) {
LMQ_TRACE("Worker ", route, " finished ", run.is_batch_job ? "batch job" : run.command);
OMQ_TRACE("Worker ", route, " finished ", run.is_batch_job ? "batch job" : run.command);
if (run.is_batch_job) {
if (tagged_worker) {
std::get<bool>(tagged_workers[worker_id - 1]) = false;
@ -218,15 +218,15 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
auto [state, thread] = batch->job_finished();
if (state == detail::BatchState::complete) {
if (thread == -1) { // run directly in proxy
LMQ_TRACE("Completion job running directly in proxy");
OMQ_TRACE("Completion job running directly in proxy");
try {
batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD
} catch (const std::exception &e) {
// Raise these to error levels: the caller really shouldn't be doing
// anything non-trivial in an in-proxy completion function!
LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
OMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
} catch (...) {
LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
OMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
}
clear_job = true;
} else {
@ -255,16 +255,16 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
run.cat->active_threads--;
}
if (max_workers == 0) { // Shutting down
LMQ_TRACE("Telling worker ", route, " to quit");
OMQ_TRACE("Telling worker ", route, " to quit");
route_control(workers_socket, route, "QUIT");
} else if (!tagged_worker) {
idle_workers.push_back(worker_id);
}
} else if (cmd == "QUITTING"sv) {
run.worker_thread.join();
LMQ_LOG(debug, "Worker ", route, " exited normally");
OMQ_LOG(debug, "Worker ", route, " exited normally");
} else {
LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'");
OMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'");
}
}
@ -289,7 +289,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
: peers.find(conn_id);
if (it == peers.end()) {
LMQ_LOG(warn, "Internal error: connection id ", conn_id, " not found");
OMQ_LOG(warn, "Internal error: connection id ", conn_id, " not found");
return;
}
peer = &it->second;
@ -348,12 +348,12 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
if (category.active_threads >= category.reserved_threads && active_workers() >= general_workers) {
// No free reserved or general spots, try to queue it for later
if (category.max_queue >= 0 && category.queued >= category.max_queue) {
LMQ_LOG(warn, "No space to queue incoming command ", command, "; already have ", category.queued,
OMQ_LOG(warn, "No space to queue incoming command ", command, "; already have ", category.queued,
"commands queued in that category (max ", category.max_queue, "); dropping message");
return;
}
LMQ_LOG(debug, "No available free workers, queuing ", command, " for later");
OMQ_LOG(debug, "No available free workers, queuing ", command, " for later");
ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_id, peer->pubkey, std::move(tmp_peer.route)};
pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second,
std::move(conn), std::move(access), peer_address(parts[command_part_index]));
@ -362,7 +362,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
}
if (cat_call.second->second /*is_request*/ && data_parts.empty()) {
LMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message");
OMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message");
return;
}
@ -379,7 +379,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
if (outgoing)
peer->activity(); // outgoing connection activity, pump the activity timer
LMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
" to worker ", run.worker_routing_id);
proxy_run_worker(run);
@ -400,18 +400,18 @@ void OxenMQ::proxy_inject_task(injected_task task) {
if (category.active_threads >= category.reserved_threads && active_workers() >= general_workers) {
// No free worker slot, queue for later
if (category.max_queue >= 0 && category.queued >= category.max_queue) {
LMQ_LOG(warn, "No space to queue injected task ", task.command, "; already have ", category.queued,
OMQ_LOG(warn, "No space to queue injected task ", task.command, "; already have ", category.queued,
"commands queued in that category (max ", category.max_queue, "); dropping task");
return;
}
LMQ_LOG(debug, "No available free workers for injected task ", task.command, "; queuing for later");
OMQ_LOG(debug, "No available free workers for injected task ", task.command, "; queuing for later");
pending_commands.emplace_back(category, std::move(task.command), std::move(task.callback), std::move(task.remote));
category.queued++;
return;
}
auto& run = get_idle_worker();
LMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id);
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id);
run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback));
proxy_run_worker(run);

View File

@ -1,7 +1,7 @@
add_subdirectory(Catch2)
set(LMQ_TEST_SRC
add_executable(tests
main.cpp
test_address.cpp
test_batch.cpp
@ -14,9 +14,7 @@ set(LMQ_TEST_SRC
test_requests.cpp
test_tagged_threads.cpp
test_timer.cpp
)
add_executable(tests ${LMQ_TEST_SRC})
)
find_package(Threads)

View File

@ -25,7 +25,7 @@ void continue_big_task(std::vector<oxenmq::job_result<double>> results) {
done.set_value({sum, exc_count});
}
void start_big_task(oxenmq::OxenMQ& lmq) {
void start_big_task(oxenmq::OxenMQ& omq) {
size_t num_jobs = 32;
oxenmq::Batch<double /*return type*/> batch;
@ -36,21 +36,21 @@ void start_big_task(oxenmq::OxenMQ& lmq) {
batch.completion(&continue_big_task);
lmq.batch(std::move(batch));
omq.batch(std::move(batch));
}
TEST_CASE("batching many small jobs", "[batch-many]") {
oxenmq::OxenMQ lmq{
oxenmq::OxenMQ omq{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
lmq.start();
omq.set_general_threads(4);
omq.set_batch_threads(4);
omq.start();
start_big_task(lmq);
start_big_task(omq);
auto sum = done.get_future().get();
auto lock = catch_lock();
REQUIRE( sum.first == 1337.0 );
@ -58,14 +58,14 @@ TEST_CASE("batching many small jobs", "[batch-many]") {
}
TEST_CASE("batch exception propagation", "[batch-exceptions]") {
oxenmq::OxenMQ lmq{
oxenmq::OxenMQ omq{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
lmq.start();
omq.set_general_threads(4);
omq.set_batch_threads(4);
omq.start();
std::promise<void> done_promise;
std::future<void> done_future = done_promise.get_future();
@ -83,7 +83,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") {
REQUIRE_THROWS_MATCHES( results[1].get() == 0, std::domain_error, Message("bad value 2") );
done_promise.set_value();
});
lmq.batch(std::move(batch));
omq.batch(std::move(batch));
done_future.get();
}
@ -105,7 +105,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") {
REQUIRE_THROWS_MATCHES( results[1].get(), std::domain_error, Message("bad value 2") );
done_promise.set_value();
});
lmq.batch(std::move(batch));
omq.batch(std::move(batch));
done_future.get();
}
@ -120,7 +120,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") {
REQUIRE_THROWS_MATCHES( results[1].get(), std::domain_error, Message("bad value 2") );
done_promise.set_value();
});
lmq.batch(std::move(batch));
omq.batch(std::move(batch));
done_future.get();
}
}

View File

@ -244,7 +244,7 @@ TEST_CASE("unique connection IDs", "[connect][id]") {
TEST_CASE("SN disconnections", "[connect][disconnect]") {
std::vector<std::unique_ptr<OxenMQ>> lmq;
std::vector<std::unique_ptr<OxenMQ>> omq;
std::vector<std::string> pubkey, privkey;
std::unordered_map<std::string, std::string> conn;
REQUIRE(sodium_init() != -1);
@ -258,13 +258,13 @@ TEST_CASE("SN disconnections", "[connect][disconnect]") {
}
std::atomic<int> his{0};
for (int i = 0; i < pubkey.size(); i++) {
lmq.push_back(std::make_unique<OxenMQ>(
omq.push_back(std::make_unique<OxenMQ>(
pubkey[i], privkey[i], true,
[conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; },
get_logger("S" + std::to_string(i) + "» "),
LogLevel::trace
));
auto& server = *lmq.back();
auto& server = *omq.back();
server.listen_curve(conn[pubkey[i]]);
server.add_category("sn", Access{AuthLevel::none, true})
@ -273,12 +273,12 @@ TEST_CASE("SN disconnections", "[connect][disconnect]") {
server.start();
}
lmq[0]->send(pubkey[1], "sn.hi");
lmq[0]->send(pubkey[2], "sn.hi");
lmq[2]->send(pubkey[0], "sn.hi");
lmq[2]->send(pubkey[1], "sn.hi");
lmq[1]->send(pubkey[0], "BYE");
lmq[0]->send(pubkey[2], "sn.hi");
omq[0]->send(pubkey[1], "sn.hi");
omq[0]->send(pubkey[2], "sn.hi");
omq[2]->send(pubkey[0], "sn.hi");
omq[2]->send(pubkey[1], "sn.hi");
omq[1]->send(pubkey[0], "BYE");
omq[0]->send(pubkey[2], "sn.hi");
std::this_thread::sleep_for(50ms * TIME_DILATION);
auto lock = catch_lock();

View File

@ -3,13 +3,13 @@
#include <future>
TEST_CASE("tagged thread start functions", "[tagged][start]") {
oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace};
oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(2);
lmq.set_batch_threads(2);
auto t_abc = lmq.add_tagged_thread("abc");
omq.set_general_threads(2);
omq.set_batch_threads(2);
auto t_abc = omq.add_tagged_thread("abc");
std::atomic<bool> start_called = false;
auto t_def = lmq.add_tagged_thread("def", [&] { start_called = true; });
auto t_def = omq.add_tagged_thread("def", [&] { start_called = true; });
std::this_thread::sleep_for(20ms);
{
@ -17,7 +17,7 @@ TEST_CASE("tagged thread start functions", "[tagged][start]") {
REQUIRE_FALSE( start_called );
}
lmq.start();
omq.start();
wait_for([&] { return start_called.load(); });
{
auto lock = catch_lock();
@ -26,24 +26,24 @@ TEST_CASE("tagged thread start functions", "[tagged][start]") {
}
TEST_CASE("tagged threads quit-before-start", "[tagged][quit]") {
auto lmq = std::make_unique<oxenmq::OxenMQ>(get_logger(""), LogLevel::trace);
auto t_abc = lmq->add_tagged_thread("abc");
REQUIRE_NOTHROW(lmq.reset());
auto omq = std::make_unique<oxenmq::OxenMQ>(get_logger(""), LogLevel::trace);
auto t_abc = omq->add_tagged_thread("abc");
REQUIRE_NOTHROW(omq.reset());
}
TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace};
oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(2);
lmq.set_batch_threads(2);
omq.set_general_threads(2);
omq.set_batch_threads(2);
std::thread::id id_abc, id_def;
auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
auto t_def = lmq.add_tagged_thread("def", [&] { id_def = std::this_thread::get_id(); });
lmq.start();
auto t_abc = omq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
auto t_def = omq.add_tagged_thread("def", [&] { id_def = std::this_thread::get_id(); });
omq.start();
std::atomic<bool> done = false;
std::thread::id id;
lmq.job([&] { id = std::this_thread::get_id(); done = true; });
omq.job([&] { id = std::this_thread::get_id(); done = true; });
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
@ -52,7 +52,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
}
done = false;
lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc);
omq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
@ -60,7 +60,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
}
done = false;
lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_def);
omq.job([&] { id = std::this_thread::get_id(); done = true; }, t_def);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
@ -69,16 +69,16 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
std::atomic<bool> sleep = true;
auto sleeper = [&] { for (int i = 0; sleep && i < 10; i++) { std::this_thread::sleep_for(25ms); } };
lmq.job(sleeper);
lmq.job(sleeper);
omq.job(sleeper);
omq.job(sleeper);
// This one should stall:
std::atomic<bool> bad = false;
lmq.job([&] { bad = true; });
omq.job([&] { bad = true; });
std::this_thread::sleep_for(50ms);
done = false;
lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc);
omq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
@ -90,9 +90,9 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
// We can queue up a bunch of jobs which should all happen in order, and all on the abc thread.
std::vector<int> v;
for (int i = 0; i < 100; i++) {
lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, t_abc);
omq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, t_abc);
}
lmq.job([&] { done = true; }, t_abc);
omq.job([&] { done = true; }, t_abc);
wait_for([&] { return done.load(); });
{
auto lock = catch_lock();
@ -111,13 +111,13 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") {
}
TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]") {
oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace};
oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
omq.set_general_threads(4);
omq.set_batch_threads(4);
std::thread::id id_abc;
auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
lmq.start();
auto t_abc = omq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
omq.start();
oxenmq::Batch<int> batch;
for (int i = 1; i < 10; i++)
@ -130,7 +130,7 @@ TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]"
sum += r.get();
result_sum = std::this_thread::get_id() == id_abc ? sum : -sum;
}, t_abc);
lmq.batch(std::move(batch));
omq.batch(std::move(batch));
wait_for([&] { return result_sum.load() != -1; });
{
auto lock = catch_lock();
@ -140,19 +140,19 @@ TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]"
TEST_CASE("timer job completion on tagged threads", "[tagged][timer]") {
oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace};
oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
omq.set_general_threads(4);
omq.set_batch_threads(4);
std::thread::id id_abc;
auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
lmq.start();
auto t_abc = omq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); });
omq.start();
std::atomic<int> ticks = 0;
std::atomic<int> abc_ticks = 0;
lmq.add_timer([&] { ticks++; }, 10ms);
lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, t_abc);
omq.add_timer([&] { ticks++; }, 10ms);
omq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, t_abc);
wait_for([&] { return ticks.load() > 2 && abc_ticks > 2; });
{