From 375cfab4cea1de83cb1aafce1a4fb63e972c907d Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 30 Nov 2021 14:10:47 -0400 Subject: [PATCH] Rebrand variables LMQ -> OMQ Various things still were using the lmq (or LMQ) names; change them all to omq/OMQ. --- README.md | 14 ++-- oxenmq/auth.cpp | 40 +++++----- oxenmq/connections.cpp | 46 ++++++------ oxenmq/jobs.cpp | 18 ++--- oxenmq/message.h | 6 +- oxenmq/oxenmq-internal.h | 10 +-- oxenmq/oxenmq.cpp | 26 +++---- oxenmq/oxenmq.h | 30 ++++---- oxenmq/proxy.cpp | 136 +++++++++++++++++----------------- oxenmq/worker.cpp | 74 +++++++++--------- tests/CMakeLists.txt | 6 +- tests/test_batch.cpp | 28 +++---- tests/test_connect.cpp | 18 ++--- tests/test_tagged_threads.cpp | 74 +++++++++--------- 14 files changed, 262 insertions(+), 264 deletions(-) diff --git a/README.md b/README.md index 057ab83..7655ec1 100644 --- a/README.md +++ b/README.md @@ -110,8 +110,8 @@ dictionaries. See `oxenmq/bt_serialize.h` if you want a bt serializer/deseriali Sending a command to a peer is done by using a connection ID, and generally falls into either a `send()` method or a `request()` method. - lmq.send(conn, "category.command", "some data"); - lmq.request(conn, "category.command", [](bool success, std::vector data) { + omq.send(conn, "category.command", "some data"); + omq.request(conn, "category.command", [](bool success, std::vector data) { if (success) { std::cout << "Remote replied: " << data.at(0) << "\n"; } }); The connection ID generally has two possible values: @@ -127,13 +127,13 @@ The connection ID generally has two possible values: ```C++ // Send to a service node, establishing a connection if necessary: std::string my_sn = ...; // 32-byte pubkey of a known SN - lmq.send(my_sn, "sn.explode", "{ \"seconds\": 30 }"); + omq.send(my_sn, "sn.explode", "{ \"seconds\": 30 }"); // Connect to a remote by address then send it something - auto conn = lmq.connect_remote("tcp://127.0.0.1:4567", + auto conn = omq.connect_remote("tcp://127.0.0.1:4567", [](ConnectionID c) { std::cout << "Connected!\n"; }, [](ConnectionID c, string_view f) { std::cout << "Connect failed: " << f << "\n" }); - lmq.request(conn, "rpc.get_height", [](bool s, std::vector d) { + omq.request(conn, "rpc.get_height", [](bool s, std::vector d) { if (s && d.size() == 1) std::cout << "Current height: " << d[0] << "\n"; else @@ -332,7 +332,7 @@ void start_big_task() { batch.completion(&continue_big_task); - lmq.batch(std::move(batch)); + omq.batch(std::move(batch)); // ... to be continued in `continue_big_task` after all the jobs finish // Can do other things here, but note that continue_big_task could run @@ -347,7 +347,7 @@ going to help you hurt yourself like that. ### Single-job queuing -As a shortcut there is a `lmq.job(...)` method that schedules a single task (with no return value) +As a shortcut there is a `omq.job(...)` method that schedules a single task (with no return value) in the batch job queue. This is useful when some event requires triggering some other event, but you don't need to wait for or collect its result. (Internally this is just a convenience method around creating a single-job, no-completion Batch job). diff --git a/oxenmq/auth.cpp b/oxenmq/auth.cpp index a114b08..6cf76ad 100644 --- a/oxenmq/auth.cpp +++ b/oxenmq/auth.cpp @@ -37,22 +37,22 @@ bool OxenMQ::proxy_check_auth(int64_t conn_id, bool outgoing, const peer_info& p std::string reply; if (!cat_call.first) { - LMQ_LOG(warn, "Invalid command '", command, "' sent by remote [", to_hex(peer.pubkey), "]/", peer_address(cmd)); + OMQ_LOG(warn, "Invalid command '", command, "' sent by remote [", to_hex(peer.pubkey), "]/", peer_address(cmd)); reply = "UNKNOWNCOMMAND"; } else if (peer.auth_level < cat_call.first->access.auth) { - LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), + OMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": peer auth level ", peer.auth_level, " < ", cat_call.first->access.auth); reply = "FORBIDDEN"; } else if (cat_call.first->access.local_sn && !local_service_node) { - LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), + OMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": that command is only available when this OxenMQ is running in service node mode"); reply = "NOT_A_SERVICE_NODE"; } else if (cat_call.first->access.remote_sn && !peer.service_node) { - LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), + OMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": remote is not recognized as a service node"); reply = "FORBIDDEN_SN"; } else if (cat_call.second->second /*is_request*/ && data.empty()) { - LMQ_LOG(warn, "Received an invalid request for '", command, "' with no reply tag from remote [", + OMQ_LOG(warn, "Received an invalid request for '", command, "' with no reply tag from remote [", to_hex(peer.pubkey), "]/", peer_address(cmd)); reply = "NO_REPLY_TAG"; } else { @@ -75,7 +75,7 @@ bool OxenMQ::proxy_check_auth(int64_t conn_id, bool outgoing, const peer_info& p send_message_parts(connections.at(conn_id), msgs); } catch (const zmq::error_t& err) { /* can't send: possibly already disconnected. Ignore. */ - LMQ_LOG(debug, "Couldn't send auth failure message ", reply, " to peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": ", err.what()); + OMQ_LOG(debug, "Couldn't send auth failure message ", reply, " to peer [", to_hex(peer.pubkey), "]/", peer_address(cmd), ": ", err.what()); } return false; @@ -97,7 +97,7 @@ void OxenMQ::proxy_set_active_sns(pubkey_set pubkeys) { for (auto it = pubkeys.begin(); it != pubkeys.end(); ) { auto& pk = *it; if (pk.size() != 32) { - LMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to set_active_sns"); + OMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to set_active_sns"); it = pubkeys.erase(it); continue; } @@ -106,7 +106,7 @@ void OxenMQ::proxy_set_active_sns(pubkey_set pubkeys) { ++it; } if (added.empty() && active_service_nodes.size() == pubkeys.size()) { - LMQ_LOG(debug, "set_active_sns(): new set of SNs is unchanged, skipping update"); + OMQ_LOG(debug, "set_active_sns(): new set of SNs is unchanged, skipping update"); return; } for (const auto& pk : active_service_nodes) { @@ -141,7 +141,7 @@ void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) { for (auto it = removed.begin(); it != removed.end(); ) { const auto& pk = *it; if (pk.size() != 32) { - LMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (removed)"); + OMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (removed)"); it = removed.erase(it); } else if (!active_service_nodes.count(pk) || added.count(pk) /* added wins if in both */) { it = removed.erase(it); @@ -153,7 +153,7 @@ void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) { for (auto it = added.begin(); it != added.end(); ) { const auto& pk = *it; if (pk.size() != 32) { - LMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (added)"); + OMQ_LOG(warn, "Invalid private key of length ", pk.size(), " (", to_hex(pk), ") passed to update_active_sns (added)"); it = added.erase(it); } else if (active_service_nodes.count(pk)) { it = added.erase(it); @@ -166,7 +166,7 @@ void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) { } void OxenMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed) { - LMQ_LOG(debug, "Updating SN auth status with +", added.size(), "/-", removed.size(), " pubkeys"); + OMQ_LOG(debug, "Updating SN auth status with +", added.size(), "/-", removed.size(), " pubkeys"); // For anything we remove we want close the connection to the SN (if outgoing), and remove the // stored peer_info (incoming or outgoing). @@ -179,7 +179,7 @@ void OxenMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed) auto conn_id = it->second.conn_id; it = peers.erase(it); if (outgoing) { - LMQ_LOG(debug, "Closing outgoing connection to ", c); + OMQ_LOG(debug, "Closing outgoing connection to ", c); proxy_close_connection(conn_id, CLOSE_LINGER); } } @@ -207,7 +207,7 @@ void OxenMQ::process_zap_requests() { log(LogLevel::trace, __FILE__, __LINE__, o.str()); } else #endif - LMQ_LOG(debug, "Processing ZAP authentication request"); + OMQ_LOG(debug, "Processing ZAP authentication request"); // https://rfc.zeromq.org/spec:27/ZAP/ // @@ -240,7 +240,7 @@ void OxenMQ::process_zap_requests() { std::string &status_code = response_vals[2], &status_text = response_vals[3]; if (frames.size() < 6 || view(frames[0]) != "1.0") { - LMQ_LOG(error, "Bad ZAP authentication request: version != 1.0 or invalid ZAP message parts"); + OMQ_LOG(error, "Bad ZAP authentication request: version != 1.0 or invalid ZAP message parts"); status_code = "500"; status_text = "Internal error: invalid auth request"; } else { @@ -251,18 +251,18 @@ void OxenMQ::process_zap_requests() { } catch (...) {} if (bind_id >= bind.size()) { - LMQ_LOG(error, "Bad ZAP authentication request: invalid auth domain '", auth_domain, "'"); + OMQ_LOG(error, "Bad ZAP authentication request: invalid auth domain '", auth_domain, "'"); status_code = "400"; status_text = "Unknown authentication domain: " + std::string{auth_domain}; } else if (bind[bind_id].curve ? !(frames.size() == 7 && view(frames[5]) == "CURVE") : !(frames.size() == 6 && view(frames[5]) == "NULL")) { - LMQ_LOG(error, "Bad ZAP authentication request: invalid ", + OMQ_LOG(error, "Bad ZAP authentication request: invalid ", bind[bind_id].curve ? "CURVE" : "NULL", " authentication request"); status_code = "500"; status_text = "Invalid authentication request mechanism"; } else if (bind[bind_id].curve && frames[6].size() != 32) { - LMQ_LOG(error, "Bad ZAP authentication request: invalid request pubkey"); + OMQ_LOG(error, "Bad ZAP authentication request: invalid request pubkey"); status_code = "500"; status_text = "Invalid public key size for CURVE authentication"; } else { @@ -281,14 +281,14 @@ void OxenMQ::process_zap_requests() { } if (auth <= AuthLevel::denied || auth > AuthLevel::admin) { - LMQ_LOG(info, "Access denied for incoming ", view(frames[5]), (sn ? " service node" : " client"), + OMQ_LOG(info, "Access denied for incoming ", view(frames[5]), (sn ? " service node" : " client"), " connection from ", !user_id.empty() ? user_id + " at " : ""s, ip, " with initial auth level ", auth); status_code = "400"; status_text = "Access denied"; user_id.clear(); } else { - LMQ_LOG(debug, "Accepted incoming ", view(frames[5]), (sn ? " service node" : " client"), + OMQ_LOG(debug, "Accepted incoming ", view(frames[5]), (sn ? " service node" : " client"), " connection with authentication level ", auth, " from ", !user_id.empty() ? user_id + " at " : ""s, ip); @@ -301,7 +301,7 @@ void OxenMQ::process_zap_requests() { } } - LMQ_TRACE("ZAP request result: ", status_code, " ", status_text); + OMQ_TRACE("ZAP request result: ", status_code, " ", status_text); std::vector response; response.reserve(response_vals.size()); diff --git a/oxenmq/connections.cpp b/oxenmq/connections.cpp index 57ec6fa..e67dbc8 100644 --- a/oxenmq/connections.cpp +++ b/oxenmq/connections.cpp @@ -119,10 +119,10 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, } if (peer) { - LMQ_TRACE("proxy asked to connect to ", to_hex(remote), "; reusing existing connection"); + OMQ_TRACE("proxy asked to connect to ", to_hex(remote), "; reusing existing connection"); if (peer->route.empty() /* == outgoing*/) { if (peer->idle_expiry < keep_alive) { - LMQ_LOG(debug, "updating existing outgoing peer connection idle expiry time from ", + OMQ_LOG(debug, "updating existing outgoing peer connection idle expiry time from ", peer->idle_expiry.count(), "ms to ", keep_alive.count(), "ms"); peer->idle_expiry = keep_alive; } @@ -130,12 +130,12 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, } return {&connections[peer->conn_id], peer->route}; } else if (optional || incoming_only) { - LMQ_LOG(debug, "proxy asked for optional or incoming connection, but no appropriate connection exists so aborting connection attempt"); + OMQ_LOG(debug, "proxy asked for optional or incoming connection, but no appropriate connection exists so aborting connection attempt"); return {nullptr, ""s}; } // No connection so establish a new one - LMQ_LOG(debug, "proxy establishing new outbound connection to ", to_hex(remote)); + OMQ_LOG(debug, "proxy establishing new outbound connection to ", to_hex(remote)); std::string addr; bool to_self = false && remote == pubkey; // FIXME; need to use a separate listening socket for this, otherwise we can't easily // tell it wasn't from a remote. @@ -147,15 +147,15 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, if (addr.empty()) addr = sn_lookup(remote); else - LMQ_LOG(debug, "using connection hint ", connect_hint); + OMQ_LOG(debug, "using connection hint ", connect_hint); if (addr.empty()) { - LMQ_LOG(error, "peer lookup failed for ", to_hex(remote)); + OMQ_LOG(error, "peer lookup failed for ", to_hex(remote)); return {nullptr, ""s}; } } - LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote)); + OMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote)); zmq::socket_t socket{context, zmq::socket_type::dealer}; setup_outgoing_socket(socket, remote, use_ephemeral_routing_id); try { @@ -163,7 +163,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, } catch (const zmq::error_t& e) { // Note that this failure cases indicates something serious went wrong that means zmq isn't // even going to try connecting (for example an unparseable remote address). - LMQ_LOG(error, "Outgoing connection to ", addr, " failed: ", e.what()); + OMQ_LOG(error, "Outgoing connection to ", addr, " failed: ", e.what()); return {nullptr, ""s}; } @@ -211,10 +211,10 @@ std::pair OxenMQ::proxy_connect_sn(bt_dict_consume void OxenMQ::proxy_close_connection(int64_t id, std::chrono::milliseconds linger) { auto it = connections.find(id); if (it == connections.end()) { - LMQ_LOG(warn, "internal error: connection to close (", id, ") doesn't exist!"); + OMQ_LOG(warn, "internal error: connection to close (", id, ") doesn't exist!"); return; } - LMQ_LOG(debug, "Closing conn ", id); + OMQ_LOG(debug, "Closing conn ", id); it->second.set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0); connections.erase(it); connections_updated = true; @@ -228,13 +228,13 @@ void OxenMQ::proxy_expire_idle_peers() { if (info.outgoing()) { auto idle = std::chrono::steady_clock::now() - info.last_activity; if (idle > info.idle_expiry) { - LMQ_LOG(debug, "Closing outgoing connection to ", it->first, ": idle time (", + OMQ_LOG(debug, "Closing outgoing connection to ", it->first, ": idle time (", std::chrono::duration_cast(idle).count(), "ms) reached connection timeout (", info.idle_expiry.count(), "ms)"); proxy_close_connection(info.conn_id, CLOSE_LINGER); it = peers.erase(it); } else { - LMQ_LOG(trace, "Not closing ", it->first, ": ", std::chrono::duration_cast(idle).count(), + OMQ_LOG(trace, "Not closing ", it->first, ": ", std::chrono::duration_cast(idle).count(), "ms <= ", info.idle_expiry.count(), "ms"); ++it; continue; @@ -246,17 +246,17 @@ void OxenMQ::proxy_expire_idle_peers() { } void OxenMQ::proxy_conn_cleanup() { - LMQ_TRACE("starting proxy connections cleanup"); + OMQ_TRACE("starting proxy connections cleanup"); // Drop idle connections (if we haven't done it in a while) - LMQ_TRACE("closing idle connections"); + OMQ_TRACE("closing idle connections"); proxy_expire_idle_peers(); auto now = std::chrono::steady_clock::now(); // FIXME - check other outgoing connections to see if they died and if so purge them - LMQ_TRACE("Timing out pending outgoing connections"); + OMQ_TRACE("Timing out pending outgoing connections"); // Check any pending outgoing connections for timeout for (auto it = pending_connects.begin(); it != pending_connects.end(); ) { auto& pc = *it; @@ -270,12 +270,12 @@ void OxenMQ::proxy_conn_cleanup() { } } - LMQ_TRACE("Timing out pending requests"); + OMQ_TRACE("Timing out pending requests"); // Remove any expired pending requests and schedule their callback with a failure for (auto it = pending_requests.begin(); it != pending_requests.end(); ) { auto& callback = it->second; if (callback.first < now) { - LMQ_LOG(debug, "pending request ", to_hex(it->first), " expired, invoking callback with failure status and removing"); + OMQ_LOG(debug, "pending request ", to_hex(it->first), " expired, invoking callback with failure status and removing"); job([callback = std::move(callback.second)] { callback(false, {{"TIMEOUT"s}}); }); it = pending_requests.erase(it); } else { @@ -283,7 +283,7 @@ void OxenMQ::proxy_conn_cleanup() { } } - LMQ_TRACE("done proxy connections cleanup"); + OMQ_TRACE("done proxy connections cleanup"); }; void OxenMQ::proxy_connect_remote(bt_dict_consumer data) { @@ -318,7 +318,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) { if (conn_id == -1 || remote.empty()) throw std::runtime_error("Internal error: CONNECT_REMOTE proxy command missing required 'conn_id' and/or 'remote' value"); - LMQ_LOG(debug, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + to_hex(remote_pubkey)); + OMQ_LOG(debug, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + to_hex(remote_pubkey)); zmq::socket_t sock{context, zmq::socket_type::dealer}; try { @@ -333,7 +333,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) { auto &s = connections.emplace_hint(connections.end(), conn_id, std::move(sock))->second; connections_updated = true; - LMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI"); + OMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI"); send_direct_message(s, "HI"); pending_connects.emplace_back(conn_id, std::chrono::steady_clock::now() + timeout, std::move(on_connect), std::move(on_failure)); @@ -363,18 +363,18 @@ void OxenMQ::proxy_disconnect(bt_dict_consumer data) { proxy_disconnect(std::move(connid), linger); } void OxenMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger) { - LMQ_TRACE("Disconnecting outgoing connection to ", conn); + OMQ_TRACE("Disconnecting outgoing connection to ", conn); auto pr = peers.equal_range(conn); for (auto it = pr.first; it != pr.second; ++it) { auto& peer = it->second; if (peer.outgoing()) { - LMQ_LOG(debug, "Closing outgoing connection to ", conn); + OMQ_LOG(debug, "Closing outgoing connection to ", conn); proxy_close_connection(peer.conn_id, linger); peers.erase(it); return; } } - LMQ_LOG(warn, "Failed to disconnect ", conn, ": no such outgoing connection"); + OMQ_LOG(warn, "Failed to disconnect ", conn, ": no such outgoing connection"); } diff --git a/oxenmq/jobs.cpp b/oxenmq/jobs.cpp index 3334ff6..66c3e6d 100644 --- a/oxenmq/jobs.cpp +++ b/oxenmq/jobs.cpp @@ -7,7 +7,7 @@ namespace oxenmq { void OxenMQ::proxy_batch(detail::Batch* batch) { batches.insert(batch); const auto [jobs, tagged_threads] = batch->size(); - LMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : ""); + OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : ""); if (!tagged_threads) { for (size_t i = 0; i < jobs; i++) batch_jobs.emplace(batch, i); @@ -82,19 +82,19 @@ void OxenMQ::proxy_timer(bt_list_consumer timer_data) { void OxenMQ::_queue_timer_job(int timer_id) { auto it = timer_jobs.find(timer_id); if (it == timer_jobs.end()) { - LMQ_LOG(warn, "Could not find timer job ", timer_id); + OMQ_LOG(warn, "Could not find timer job ", timer_id); return; } auto& [func, squelch, running, thread] = it->second; if (squelch && running) { - LMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running"); + OMQ_LOG(debug, "Not running timer job ", timer_id, " because a job for that timer is still running"); return; } if (thread == -1) { // Run directly in proxy thread try { func(); } - catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } - catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } + catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } + catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } return; } @@ -104,15 +104,15 @@ void OxenMQ::_queue_timer_job(int timer_id) { running = true; b->completion([this,timer_id](auto results) { try { results[0].get(); } - catch (const std::exception &e) { LMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } - catch (...) { LMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } + catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } + catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } auto it = timer_jobs.find(timer_id); if (it != timer_jobs.end()) it->second.running = false; }, OxenMQ::run_in_proxy); } batches.insert(b); - LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); + OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); assert(b->size() == std::make_pair(size_t{1}, thread > 0)); auto& queue = thread > 0 ? std::get>(tagged_workers[thread - 1]) @@ -172,7 +172,7 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function busy = false; run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs) run.worker_routing_id = "t" + std::to_string(run.worker_id); - LMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id); + OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id); run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)}; diff --git a/oxenmq/message.h b/oxenmq/message.h index aa399ad..7c3a667 100644 --- a/oxenmq/message.h +++ b/oxenmq/message.h @@ -19,8 +19,8 @@ public: std::string remote; ///< Some sort of remote address from which the request came. Often "IP" for TCP connections and "localhost:UID:GID:PID" for unix socket connections. /// Constructor - Message(OxenMQ& lmq, ConnectionID cid, Access access, std::string remote) - : oxenmq{lmq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {} + Message(OxenMQ& omq, ConnectionID cid, Access access, std::string remote) + : oxenmq{omq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {} // Non-copyable Message(const Message&) = delete; @@ -49,7 +49,7 @@ public: void send_reply(Args&&... args); /// Sends a request back to whomever sent this message. This is effectively a wrapper around - /// lmq.request() that takes care of setting up the recipient arguments. + /// omq.request() that takes care of setting up the recipient arguments. template void send_request(std::string_view command, ReplyCallback&& callback, Args&&... args); diff --git a/oxenmq/oxenmq-internal.h b/oxenmq/oxenmq-internal.h index f43a8ac..f737580 100644 --- a/oxenmq/oxenmq-internal.h +++ b/oxenmq/oxenmq-internal.h @@ -2,15 +2,15 @@ #include "oxenmq.h" // Inside some method: -// LMQ_LOG(warn, "bad ", 42, " stuff"); +// OMQ_LOG(warn, "bad ", 42, " stuff"); // -#define LMQ_LOG(level, ...) log(LogLevel::level, __FILE__, __LINE__, __VA_ARGS__) +#define OMQ_LOG(level, ...) log(LogLevel::level, __FILE__, __LINE__, __VA_ARGS__) #ifndef NDEBUG -// Same as LMQ_LOG(trace, ...) when not doing a release build; nothing under a release build. -# define LMQ_TRACE(...) log(LogLevel::trace, __FILE__, __LINE__, __VA_ARGS__) +// Same as OMQ_LOG(trace, ...) when not doing a release build; nothing under a release build. +# define OMQ_TRACE(...) log(LogLevel::trace, __FILE__, __LINE__, __VA_ARGS__) #else -# define LMQ_TRACE(...) +# define OMQ_TRACE(...) #endif namespace oxenmq { diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index ef802d9..e53f499 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -199,7 +199,7 @@ OxenMQ::OxenMQ( sn_lookup{std::move(lookup)}, log_lvl{level}, logger{std::move(logger)} { - LMQ_TRACE("Constructing OxenMQ, id=", object_id, ", this=", this); + OMQ_TRACE("Constructing OxenMQ, id=", object_id, ", this=", this); if (sodium_init() == -1) throw std::runtime_error{"libsodium initialization failed"}; @@ -209,7 +209,7 @@ OxenMQ::OxenMQ( } else if (pubkey.empty()) { if (service_node) throw std::invalid_argument("Cannot construct a service node mode OxenMQ without a keypair"); - LMQ_LOG(debug, "generating x25519 keypair for remote-only OxenMQ instance"); + OMQ_LOG(debug, "generating x25519 keypair for remote-only OxenMQ instance"); pubkey.resize(crypto_box_PUBLICKEYBYTES); privkey.resize(crypto_box_SECRETKEYBYTES); crypto_box_keypair(reinterpret_cast(&pubkey[0]), reinterpret_cast(&privkey[0])); @@ -232,13 +232,13 @@ void OxenMQ::start() { if (proxy_thread.joinable()) throw std::logic_error("Cannot call start() multiple times!"); - LMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey)); + OMQ_LOG(info, "Initializing OxenMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey)); int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit); if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit) context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS); else - LMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]"); + OMQ_LOG(error, "Not applying OxenMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]"); // We bind `command` here so that the `get_control_socket()` below is always connecting to a // bound socket, but we do nothing else here: the proxy thread is responsible for everything @@ -246,10 +246,10 @@ void OxenMQ::start() { command.bind(SN_ADDR_COMMAND); proxy_thread = std::thread{&OxenMQ::proxy_loop, this}; - LMQ_LOG(debug, "Waiting for proxy thread to get ready..."); + OMQ_LOG(debug, "Waiting for proxy thread to get ready..."); auto &control = get_control_socket(); detail::send_control(control, "START"); - LMQ_TRACE("Sent START command"); + OMQ_TRACE("Sent START command"); zmq::message_t ready_msg; std::vector parts; @@ -258,7 +258,7 @@ void OxenMQ::start() { if (!(parts.size() == 1 && view(parts.front()) == "READY")) throw std::runtime_error("Invalid startup message from proxy thread (didn't get expected READY message)"); - LMQ_LOG(debug, "Proxy thread is ready"); + OMQ_LOG(debug, "Proxy thread is ready"); } void OxenMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection, std::function on_bind) { @@ -286,7 +286,7 @@ void OxenMQ::listen_plain(std::string bind_addr, AllowFunc allow_connection, std std::pair*> OxenMQ::get_command(std::string& command) { if (command.size() > MAX_CATEGORY_LENGTH + 1 + MAX_COMMAND_LENGTH) { - LMQ_LOG(warn, "Invalid command '", command, "': command too long"); + OMQ_LOG(warn, "Invalid command '", command, "': command too long"); return {}; } @@ -298,7 +298,7 @@ std::pair*> Ox auto dot = command.find('.'); if (dot == 0 || dot == std::string::npos) { - LMQ_LOG(warn, "Invalid command '", command, "': expected ."); + OMQ_LOG(warn, "Invalid command '", command, "': expected ."); return {}; } std::string catname = command.substr(0, dot); @@ -306,14 +306,14 @@ std::pair*> Ox auto catit = categories.find(catname); if (catit == categories.end()) { - LMQ_LOG(warn, "Invalid command category '", catname, "'"); + OMQ_LOG(warn, "Invalid command category '", catname, "'"); return {}; } const auto& category = catit->second; auto callback_it = category.commands.find(cmd); if (callback_it == category.commands.end()) { - LMQ_LOG(warn, "Invalid command '", command, "'"); + OMQ_LOG(warn, "Invalid command '", command, "'"); return {}; } @@ -416,10 +416,10 @@ OxenMQ::~OxenMQ() { return; } - LMQ_LOG(info, "OxenMQ shutting down proxy thread"); + OMQ_LOG(info, "OxenMQ shutting down proxy thread"); detail::send_control(get_control_socket(), "QUIT"); proxy_thread.join(); - LMQ_LOG(info, "OxenMQ proxy thread has stopped"); + OMQ_LOG(info, "OxenMQ proxy thread has stopped"); } std::ostream &operator<<(std::ostream &os, LogLevel lvl) { diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index afd4629..b83fcca 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -680,7 +680,7 @@ private: Access access; std::string remote; - // Normal ctor for an actual lmq command being processed + // Normal ctor for an actual omq command being processed pending_command(category& cat, std::string command, std::vector data_parts, const std::pair* callback, ConnectionID conn, Access access, std::string remote) : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, @@ -963,7 +963,7 @@ public: * another `set_active_sns()` or a `update_active_sns()` call). It *is* possible to make the * initial call after calling `start()`, but that creates a window during which incoming * remote SN connections will be erroneously treated as non-SN connections. - * - If this LMQ instance should accept incoming connections, set up any listening ports via + * - If this OMQ instance should accept incoming connections, set up any listening ports via * `listen_curve()` and/or `listen_plain()`. */ void start(); @@ -1147,13 +1147,13 @@ public: * Example: * * // Send to a SN, connecting to it if we aren't already connected: - * lmq.send(pubkey, "hello.world", "abc", send_option::hint("tcp://localhost:1234"), "def"); + * omq.send(pubkey, "hello.world", "abc", send_option::hint("tcp://localhost:1234"), "def"); * * // Start connecting to a remote and immediately queue a message for it - * auto conn = lmq.connect_remote("tcp://127.0.0.1:1234", + * auto conn = omq.connect_remote("tcp://127.0.0.1:1234", * [](ConnectionID) { std::cout << "connected\n"; }, * [](ConnectionID, string_view why) { std::cout << "connection failed: " << why << \n"; }); - * lmq.send(conn, "hello.world", "abc", "def"); + * omq.send(conn, "hello.world", "abc", "def"); * * Both of these send the command `hello.world` to the given pubkey, containing additional * message parts "abc" and "def". In the first case, if not currently connected, the given @@ -1204,7 +1204,7 @@ public: * @param category - the category name that should handle the request for the purposes of * scheduling the job. The category must have been added using add_category(). The category * can be an actual category with added commands, in which case the injected tasks are queued - * along with LMQ requests for that category, or can have no commands to set up a distinct + * along with OMQ requests for that category, or can have no commands to set up a distinct * category for the injected jobs. * * @param command - a command name; this is mainly used for debugging and does not need to @@ -1326,32 +1326,32 @@ public: /// /// This allows simplifying: /// -/// lmq.add_category("foo", ...); -/// lmq.add_command("foo", "a", ...); -/// lmq.add_command("foo", "b", ...); -/// lmq.add_request_command("foo", "c", ...); +/// omq.add_category("foo", ...); +/// omq.add_command("foo", "a", ...); +/// omq.add_command("foo", "b", ...); +/// omq.add_request_command("foo", "c", ...); /// /// to: /// -/// lmq.add_category("foo", ...) +/// omq.add_category("foo", ...) /// .add_command("a", ...) /// .add_command("b", ...) /// .add_request_command("b", ...) /// ; class CatHelper { - OxenMQ& lmq; + OxenMQ& omq; std::string cat; public: - CatHelper(OxenMQ& lmq, std::string cat) : lmq{lmq}, cat{std::move(cat)} {} + CatHelper(OxenMQ& omq, std::string cat) : omq{omq}, cat{std::move(cat)} {} CatHelper& add_command(std::string name, OxenMQ::CommandCallback callback) { - lmq.add_command(cat, std::move(name), std::move(callback)); + omq.add_command(cat, std::move(name), std::move(callback)); return *this; } CatHelper& add_request_command(std::string name, OxenMQ::CommandCallback callback) { - lmq.add_request_command(cat, std::move(name), std::move(callback)); + omq.add_request_command(cat, std::move(name), std::move(callback)); return *this; } }; diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 1bae0fd..8e5f7b5 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -20,7 +20,7 @@ extern "C" { namespace oxenmq { void OxenMQ::proxy_quit() { - LMQ_LOG(debug, "Received quit command, shutting down proxy thread"); + OMQ_LOG(debug, "Received quit command, shutting down proxy thread"); assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); })); assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); })); @@ -38,7 +38,7 @@ void OxenMQ::proxy_quit() { connections.clear(); peers.clear(); - LMQ_LOG(debug, "Proxy thread teardown complete"); + OMQ_LOG(debug, "Proxy thread teardown complete"); } void OxenMQ::proxy_send(bt_dict_consumer data) { @@ -120,10 +120,10 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { if (!sock_route.first) { nowarn = true; if (optional) - LMQ_LOG(debug, "Not sending: send is optional and no connection to ", + OMQ_LOG(debug, "Not sending: send is optional and no connection to ", to_hex(conn_id.pk), " is currently established"); else - LMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no valid connection address found"); + OMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no valid connection address found"); break; } send_to = sock_route.first; @@ -131,20 +131,20 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { } else if (!conn_id.route.empty()) { // incoming non-SN connection auto it = connections.find(conn_id.id); if (it == connections.end()) { - LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found"); + OMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found"); break; } send_to = &it->second; } else { auto pr = peers.equal_range(conn_id); if (pr.first == peers.end()) { - LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection"); + OMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection"); break; } auto& peer = pr.first->second; auto it = connections.find(peer.conn_id); if (it == connections.end()) { - LMQ_LOG(warn, "Unable to send: peer connection id ", conn_id, " is not (or is no longer) a valid outgoing connection"); + OMQ_LOG(warn, "Unable to send: peer connection id ", conn_id, " is not (or is no longer) a valid outgoing connection"); break; } send_to = &it->second; @@ -155,7 +155,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { } catch (const zmq::error_t &e) { if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) { - LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details"); + OMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details"); auto pr = peers.equal_range(conn_id); if (pr.first != peers.end()) { @@ -174,7 +174,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { // The incoming connection to the SN is no longer good, but we can retry because // we may have another active connection with the SN (or may want to open one). if (removed) { - LMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets"); + OMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets"); retry = true; } } @@ -182,10 +182,10 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { } if (!retry) { if (!conn_id.sn() && !conn_id.route.empty()) { // incoming non-SN connection - LMQ_LOG(debug, "Unable to send message to incoming connection ", conn_id, ": ", e.what(), + OMQ_LOG(debug, "Unable to send message to incoming connection ", conn_id, ": ", e.what(), "; remote has probably disconnected"); } else { - LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what()); + OMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what()); } nowarn = true; if (callback_nosend) { @@ -197,11 +197,11 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { } if (request) { if (sent) { - LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag)); + OMQ_LOG(debug, "Added new pending request ", to_hex(request_tag)); pending_requests.insert({ request_tag, { std::chrono::steady_clock::now() + request_timeout, std::move(request_callback) }}); } else { - LMQ_LOG(debug, "Could not send request, scheduling request callback failure"); + OMQ_LOG(debug, "Could not send request, scheduling request callback failure"); job([callback = std::move(request_callback)] { callback(false, {{"TIMEOUT"s}}); }); } } @@ -211,7 +211,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) { else if (callback_noqueue) job(std::move(callback_noqueue)); else if (!nowarn) - LMQ_LOG(warn, "Unable to send message to ", conn_id, ": sending would block"); + OMQ_LOG(warn, "Unable to send message to ", conn_id, ": sending would block"); } } @@ -238,7 +238,7 @@ void OxenMQ::proxy_reply(bt_dict_consumer data) { auto pr = peers.equal_range(conn_id); if (pr.first == pr.second) { - LMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid"); + OMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid"); return; } @@ -251,18 +251,18 @@ void OxenMQ::proxy_reply(bt_dict_consumer data) { } catch (const zmq::error_t &err) { if (err.num() == EHOSTUNREACH) { if (it->second.outgoing()) { - LMQ_LOG(debug, "Unable to send reply to non-SN request on outgoing socket: " + OMQ_LOG(debug, "Unable to send reply to non-SN request on outgoing socket: " "remote is no longer connected; closing connection"); proxy_close_connection(it->second.conn_id, CLOSE_LINGER); it = peers.erase(it); ++it; } else { - LMQ_LOG(debug, "Unable to send reply to non-SN request on incoming socket: " + OMQ_LOG(debug, "Unable to send reply to non-SN request on incoming socket: " "remote is no longer connected; removing peer details"); it = peers.erase(it); } } else { - LMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what()); + OMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what()); ++it; } } @@ -275,22 +275,22 @@ void OxenMQ::proxy_control_message(std::vector& parts) { if (parts.size() < 2) throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message"); auto route = view(parts[0]), cmd = view(parts[1]); - LMQ_TRACE("control message: ", cmd); + OMQ_TRACE("control message: ", cmd); if (parts.size() == 3) { - LMQ_TRACE("...: ", parts[2]); + OMQ_TRACE("...: ", parts[2]); auto data = view(parts[2]); if (cmd == "SEND") { - LMQ_TRACE("proxying message"); + OMQ_TRACE("proxying message"); return proxy_send(data); } else if (cmd == "REPLY") { - LMQ_TRACE("proxying reply to non-SN incoming message"); + OMQ_TRACE("proxying reply to non-SN incoming message"); return proxy_reply(data); } else if (cmd == "BATCH") { - LMQ_TRACE("proxy batch jobs"); + OMQ_TRACE("proxy batch jobs"); auto ptrval = bt_deserialize(data); return proxy_batch(reinterpret_cast(ptrval)); } else if (cmd == "INJECT") { - LMQ_TRACE("proxy inject"); + OMQ_TRACE("proxy inject"); return proxy_inject_task(detail::deserialize_object(bt_deserialize(data))); } else if (cmd == "SET_SNS") { return proxy_set_active_sns(data); @@ -351,11 +351,11 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) { b.on_bind = nullptr; } if (!good) { - LMQ_LOG(warn, "OxenMQ failed to listen on ", b.address); + OMQ_LOG(warn, "OxenMQ failed to listen on ", b.address); return false; } - LMQ_LOG(info, "OxenMQ listening on ", b.address); + OMQ_LOG(info, "OxenMQ listening on ", b.address); b.conn_id = next_conn_id++; connections.emplace_hint(connections.end(), b.conn_id, std::move(listener)); @@ -368,11 +368,11 @@ bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) { void OxenMQ::proxy_loop() { #if defined(__linux__) || defined(__sun) || defined(__MINGW32__) - pthread_setname_np(pthread_self(), "lmq-proxy"); + pthread_setname_np(pthread_self(), "omq-proxy"); #elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) - pthread_set_name_np(pthread_self(), "lmq-proxy"); + pthread_set_name_np(pthread_self(), "omq-proxy"); #elif defined(__MACH__) - pthread_setname_np("lmq-proxy"); + pthread_setname_np("omq-proxy"); #endif zap_auth.set(zmq::sockopt::linger, 0); @@ -393,12 +393,12 @@ void OxenMQ::proxy_loop() { } if (log_level() >= LogLevel::debug) { - LMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:"); + OMQ_LOG(debug, "Reserving space for ", max_workers, " max workers = ", general_workers, " general plus reservations for:"); for (const auto& cat : categories) - LMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads); - LMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved); - LMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved); - LMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads"); + OMQ_LOG(debug, " - ", cat.first, ": ", cat.second.reserved_threads); + OMQ_LOG(debug, " - (batch jobs): ", batch_jobs_reserved); + OMQ_LOG(debug, " - (reply jobs): ", reply_jobs_reserved); + OMQ_LOG(debug, "Plus ", tagged_workers.size(), " tagged worker threads"); } workers.reserve(max_workers); @@ -420,7 +420,7 @@ void OxenMQ::proxy_loop() { for (size_t i = 0; i < bind.size(); i++) { if (!proxy_bind(bind[i], i)) { - LMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address); + OMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address); throw zmq::error_t{}; } } @@ -467,25 +467,25 @@ void OxenMQ::proxy_loop() { // and send them back a "START" to let them know to go ahead with startup. We need this // synchronization dance to guarantee that the workers are routable before we can proceed. if (!tagged_workers.empty()) { - LMQ_LOG(debug, "Waiting for tagged workers"); + OMQ_LOG(debug, "Waiting for tagged workers"); std::unordered_set waiting_on; for (auto& w : tagged_workers) waiting_on.emplace(std::get(w).worker_routing_id); for (; !waiting_on.empty(); parts.clear()) { recv_message_parts(workers_socket, parts); if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) { - LMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup"); + OMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup"); continue; } - LMQ_LOG(debug, "Received STARTING message from ", view(parts[0])); + OMQ_LOG(debug, "Received STARTING message from ", view(parts[0])); if (auto it = waiting_on.find(view(parts[0])); it != waiting_on.end()) waiting_on.erase(it); else - LMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0])); + OMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0])); } for (auto&w : tagged_workers) { - LMQ_LOG(debug, "Telling tagged thread worker ", std::get(w).worker_routing_id, " to finish startup"); + OMQ_LOG(debug, "Telling tagged thread worker ", std::get(w).worker_routing_id, " to finish startup"); route_control(workers_socket, std::get(w).worker_routing_id, "START"); } } @@ -509,7 +509,7 @@ void OxenMQ::proxy_loop() { if (proxy_skip_one_poll) proxy_skip_one_poll = false; else { - LMQ_TRACE("polling for new messages"); + OMQ_TRACE("polling for new messages"); // We poll the control socket and worker socket for any incoming messages. If we have // available worker room then also poll incoming connections and outgoing connections @@ -518,30 +518,30 @@ void OxenMQ::proxy_loop() { zmq::poll(pollitems.data(), pollitems.size(), poll_timeout); } - LMQ_TRACE("processing control messages"); + OMQ_TRACE("processing control messages"); // Retrieve any waiting incoming control messages for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) { proxy_control_message(parts); } - LMQ_TRACE("processing worker messages"); + OMQ_TRACE("processing worker messages"); for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) { proxy_worker_message(parts); } - LMQ_TRACE("processing timers"); + OMQ_TRACE("processing timers"); zmq_timers_execute(timers.get()); // Handle any zap authentication - LMQ_TRACE("processing zap requests"); + OMQ_TRACE("processing zap requests"); process_zap_requests(); // See if we can drain anything from the current queue before we potentially add to it // below. - LMQ_TRACE("processing queued jobs and messages"); + OMQ_TRACE("processing queued jobs and messages"); proxy_process_queue(); - LMQ_TRACE("processing new incoming messages"); + OMQ_TRACE("processing new incoming messages"); // We round-robin connections when pulling off pending messages one-by-one rather than // pulling off all messages from one connection before moving to the next; thus in cases of @@ -566,7 +566,7 @@ void OxenMQ::proxy_loop() { ++end %= queue.size(); if (parts.empty()) { - LMQ_LOG(warn, "Ignoring empty (0-part) incoming message"); + OMQ_LOG(warn, "Ignoring empty (0-part) incoming message"); continue; } @@ -576,12 +576,12 @@ void OxenMQ::proxy_loop() { if (connections_updated) { // If connections got updated then our points are stale, to restart the proxy loop; // if there are still messages waiting we'll end up right back here. - LMQ_TRACE("connections became stale; short-circuiting incoming message loop"); + OMQ_TRACE("connections became stale; short-circuiting incoming message loop"); break; } } - LMQ_TRACE("done proxy loop"); + OMQ_TRACE("done proxy loop"); } } @@ -597,7 +597,7 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec std::string_view route, cmd; if (parts.size() < 1 + incoming) { - LMQ_LOG(warn, "Received empty message; ignoring"); + OMQ_LOG(warn, "Received empty message; ignoring"); return true; } if (incoming) { @@ -606,18 +606,18 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec } else { cmd = view(parts[0]); } - LMQ_TRACE("Checking for builtins: '", cmd, "' from ", peer_address(parts.back())); + OMQ_TRACE("Checking for builtins: '", cmd, "' from ", peer_address(parts.back())); if (cmd == "REPLY") { size_t tag_pos = 1 + incoming; if (parts.size() <= tag_pos) { - LMQ_LOG(warn, "Received REPLY without a reply tag; ignoring"); + OMQ_LOG(warn, "Received REPLY without a reply tag; ignoring"); return true; } std::string reply_tag{view(parts[tag_pos])}; auto it = pending_requests.find(reply_tag); if (it != pending_requests.end()) { - LMQ_LOG(debug, "Received REPLY for pending command ", to_hex(reply_tag), "; scheduling callback"); + OMQ_LOG(debug, "Received REPLY for pending command ", to_hex(reply_tag), "; scheduling callback"); std::vector data; data.reserve(parts.size() - (tag_pos + 1)); for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it) @@ -627,38 +627,38 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec }); pending_requests.erase(it); } else { - LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring"); + OMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring"); } return true; } else if (cmd == "HI") { if (!incoming) { - LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring"); + OMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring"); return true; } - LMQ_LOG(debug, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO"); + OMQ_LOG(debug, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO"); try { send_routed_message(sock, std::string{route}, "HELLO"); - } catch (const std::exception &e) { LMQ_LOG(warn, "Couldn't reply with HELLO: ", e.what()); } + } catch (const std::exception &e) { OMQ_LOG(warn, "Couldn't reply with HELLO: ", e.what()); } return true; } else if (cmd == "HELLO") { if (incoming) { - LMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring"); + OMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring"); return true; } auto it = std::find_if(pending_connects.begin(), pending_connects.end(), [&](auto& pc) { return std::get(pc) == conn_id; }); if (it == pending_connects.end()) { - LMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring"); + OMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring"); return true; } auto& pc = *it; auto pit = peers.find(std::get(pc)); if (pit == peers.end()) { - LMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring"); + OMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring"); return true; } - LMQ_LOG(debug, "Got initial HELLO server response from ", peer_address(parts.back())); + OMQ_LOG(debug, "Got initial HELLO server response from ", peer_address(parts.back())); proxy_schedule_reply_job([on_success=std::move(std::get(pc)), conn=pit->first] { on_success(conn); @@ -667,10 +667,10 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec return true; } else if (cmd == "BYE") { if (!incoming) { - LMQ_LOG(debug, "BYE command received; disconnecting from ", peer_address(parts.back())); + OMQ_LOG(debug, "BYE command received; disconnecting from ", peer_address(parts.back())); proxy_close_connection(conn_id, 0s); } else { - LMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring"); + OMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring"); } return true; @@ -688,7 +688,7 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec // pre-1.1.0 sent just a plain UNKNOWNCOMMAND (without the actual command); this was not // useful, but also this response is *expected* for things 1.0.5 didn't understand, like // FORBIDDEN_SN: so log it only at debug level and move on. - LMQ_LOG(debug, "Received plain UNKNOWNCOMMAND; remote is probably an older oxenmq. Ignoring."); + OMQ_LOG(debug, "Received plain UNKNOWNCOMMAND; remote is probably an older oxenmq. Ignoring."); return true; } @@ -696,16 +696,16 @@ bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vec std::string reply_tag{view(parts[2 + incoming])}; auto it = pending_requests.find(reply_tag); if (it != pending_requests.end()) { - LMQ_LOG(debug, "Received ", cmd, " REPLY for pending command ", to_hex(reply_tag), "; scheduling failure callback"); + OMQ_LOG(debug, "Received ", cmd, " REPLY for pending command ", to_hex(reply_tag), "; scheduling failure callback"); proxy_schedule_reply_job([callback=std::move(it->second.second), cmd=std::string{cmd}] { callback(false, {{std::move(cmd)}}); }); pending_requests.erase(it); } else { - LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring"); + OMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring"); } } else { - LMQ_LOG(warn, "Received ", cmd, ':', (parts.size() > 1 + incoming ? view(parts[1 + incoming]) : "(unknown command)"sv), + OMQ_LOG(warn, "Received ", cmd, ':', (parts.size() > 1 + incoming ? view(parts[1 + incoming]) : "(unknown command)"sv), " from ", peer_address(parts.back())); } return true; diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index 1e71629..54fe08c 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -17,29 +17,29 @@ namespace { // received. If "QUIT" was received, replies with "QUITTING" on the socket and closes it, then // returns false. [[gnu::always_inline]] inline -bool worker_wait_for(OxenMQ& lmq, zmq::socket_t& sock, std::vector& parts, const std::string_view worker_id, const std::string_view expect) { +bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector& parts, const std::string_view worker_id, const std::string_view expect) { while (true) { - lmq.log(LogLevel::trace, __FILE__, __LINE__, "worker ", worker_id, " waiting for ", expect); + omq.log(LogLevel::trace, __FILE__, __LINE__, "worker ", worker_id, " waiting for ", expect); parts.clear(); recv_message_parts(sock, parts); if (parts.size() != 1) { - lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part control msg"); + omq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part control msg"); continue; } auto command = view(parts[0]); if (command == expect) { #ifndef NDEBUG - lmq.log(LogLevel::trace, __FILE__, __LINE__, "Worker ", worker_id, " received waited-for ", expect, " command"); + omq.log(LogLevel::trace, __FILE__, __LINE__, "Worker ", worker_id, " received waited-for ", expect, " command"); #endif return true; } else if (command == "QUIT"sv) { - lmq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down"); + omq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down"); detail::send_control(sock, "QUITTING"); sock.set(zmq::sockopt::linger, 1000); sock.close(); return false; } else { - lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid command: `", command, "'"); + omq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid command: `", command, "'"); } } } @@ -50,7 +50,7 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug - [[maybe_unused]] std::string thread_name = tagged.value_or("lmq-" + routing_id); + [[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id); #if defined(__linux__) || defined(__sun) || defined(__MINGW32__) if (thread_name.size() > 15) thread_name.resize(15); pthread_setname_np(pthread_self(), thread_name.c_str()); @@ -62,7 +62,7 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged zmq::socket_t sock{context, zmq::socket_type::dealer}; sock.set(zmq::sockopt::routing_id, routing_id); - LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started"); + OMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started"); sock.connect(SN_ADDR_WORKERS); if (tagged) detail::send_control(sock, "STARTING"); @@ -101,15 +101,15 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged if (run.is_batch_job) { auto* batch = var::get(run.to_run); if (run.batch_jobno >= 0) { - LMQ_TRACE("worker thread ", worker_id, " running batch ", batch, "#", run.batch_jobno); + OMQ_TRACE("worker thread ", worker_id, " running batch ", batch, "#", run.batch_jobno); batch->run_job(run.batch_jobno); } else if (run.batch_jobno == -1) { - LMQ_TRACE("worker thread ", worker_id, " running batch ", batch, " completion"); + OMQ_TRACE("worker thread ", worker_id, " running batch ", batch, " completion"); batch->job_completion(); } } else if (run.is_injected) { auto& func = var::get>(run.to_run); - LMQ_TRACE("worker thread ", worker_id, " invoking injected command ", run.command); + OMQ_TRACE("worker thread ", worker_id, " invoking injected command ", run.command); func(); func = nullptr; } else { @@ -118,7 +118,7 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged message.remote = std::move(run.remote); message.data.clear(); - LMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)"); + OMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)"); auto& [callback, is_request] = *var::get*>(run.to_run); if (is_request) { @@ -130,26 +130,26 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged message.data.emplace_back(m.data(), m.size()); } - LMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); + OMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); callback(message); } } catch (const bt_deserialize_invalid& e) { - LMQ_LOG(warn, worker_id, " deserialization failed: ", e.what(), "; ignoring request"); + OMQ_LOG(warn, worker_id, " deserialization failed: ", e.what(), "; ignoring request"); } #ifndef BROKEN_APPLE_VARIANT catch (const std::bad_variant_access& e) { - LMQ_LOG(warn, worker_id, " deserialization failed: found unexpected serialized type (", e.what(), "); ignoring request"); + OMQ_LOG(warn, worker_id, " deserialization failed: found unexpected serialized type (", e.what(), "); ignoring request"); } #endif catch (const std::out_of_range& e) { - LMQ_LOG(warn, worker_id, " deserialization failed: invalid data - required field missing (", e.what(), "); ignoring request"); + OMQ_LOG(warn, worker_id, " deserialization failed: invalid data - required field missing (", e.what(), "); ignoring request"); } catch (const std::exception& e) { - LMQ_LOG(warn, worker_id, " caught exception when processing command: ", e.what()); + OMQ_LOG(warn, worker_id, " caught exception when processing command: ", e.what()); } catch (...) { - LMQ_LOG(warn, worker_id, " caught non-standard exception when processing command"); + OMQ_LOG(warn, worker_id, " caught non-standard exception when processing command"); } // Tell the proxy thread that we are ready for another job @@ -177,11 +177,11 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() { void OxenMQ::proxy_worker_message(std::vector& parts) { // Process messages sent by workers if (parts.size() != 2) { - LMQ_LOG(error, "Received send invalid ", parts.size(), "-part message"); + OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message"); return; } auto route = view(parts[0]), cmd = view(parts[1]); - LMQ_TRACE("worker message from ", route); + OMQ_TRACE("worker message from ", route); assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9'); bool tagged_worker = route[0] == 't'; std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t") @@ -192,15 +192,15 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { : worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1 ) ) { - LMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command"); + OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command"); return; } auto& run = tagged_worker ? std::get(tagged_workers[worker_id - 1]) : workers[worker_id]; - LMQ_TRACE("received ", cmd, " command from ", route); + OMQ_TRACE("received ", cmd, " command from ", route); if (cmd == "RAN"sv) { - LMQ_TRACE("Worker ", route, " finished ", run.is_batch_job ? "batch job" : run.command); + OMQ_TRACE("Worker ", route, " finished ", run.is_batch_job ? "batch job" : run.command); if (run.is_batch_job) { if (tagged_worker) { std::get(tagged_workers[worker_id - 1]) = false; @@ -218,15 +218,15 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { auto [state, thread] = batch->job_finished(); if (state == detail::BatchState::complete) { if (thread == -1) { // run directly in proxy - LMQ_TRACE("Completion job running directly in proxy"); + OMQ_TRACE("Completion job running directly in proxy"); try { batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD } catch (const std::exception &e) { // Raise these to error levels: the caller really shouldn't be doing // anything non-trivial in an in-proxy completion function! - LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what()); + OMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what()); } catch (...) { - LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command"); + OMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command"); } clear_job = true; } else { @@ -255,16 +255,16 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { run.cat->active_threads--; } if (max_workers == 0) { // Shutting down - LMQ_TRACE("Telling worker ", route, " to quit"); + OMQ_TRACE("Telling worker ", route, " to quit"); route_control(workers_socket, route, "QUIT"); } else if (!tagged_worker) { idle_workers.push_back(worker_id); } } else if (cmd == "QUITTING"sv) { run.worker_thread.join(); - LMQ_LOG(debug, "Worker ", route, " exited normally"); + OMQ_LOG(debug, "Worker ", route, " exited normally"); } else { - LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'"); + OMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'"); } } @@ -289,7 +289,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectorsecond; @@ -348,12 +348,12 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector= category.reserved_threads && active_workers() >= general_workers) { // No free reserved or general spots, try to queue it for later if (category.max_queue >= 0 && category.queued >= category.max_queue) { - LMQ_LOG(warn, "No space to queue incoming command ", command, "; already have ", category.queued, + OMQ_LOG(warn, "No space to queue incoming command ", command, "; already have ", category.queued, "commands queued in that category (max ", category.max_queue, "); dropping message"); return; } - LMQ_LOG(debug, "No available free workers, queuing ", command, " for later"); + OMQ_LOG(debug, "No available free workers, queuing ", command, " for later"); ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_id, peer->pubkey, std::move(tmp_peer.route)}; pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::move(conn), std::move(access), peer_address(parts[command_part_index])); @@ -362,7 +362,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectorsecond /*is_request*/ && data_parts.empty()) { - LMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message"); + OMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message"); return; } @@ -379,7 +379,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectoractivity(); // outgoing connection activity, pump the activity timer - LMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]), + OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]), " to worker ", run.worker_routing_id); proxy_run_worker(run); @@ -400,18 +400,18 @@ void OxenMQ::proxy_inject_task(injected_task task) { if (category.active_threads >= category.reserved_threads && active_workers() >= general_workers) { // No free worker slot, queue for later if (category.max_queue >= 0 && category.queued >= category.max_queue) { - LMQ_LOG(warn, "No space to queue injected task ", task.command, "; already have ", category.queued, + OMQ_LOG(warn, "No space to queue injected task ", task.command, "; already have ", category.queued, "commands queued in that category (max ", category.max_queue, "); dropping task"); return; } - LMQ_LOG(debug, "No available free workers for injected task ", task.command, "; queuing for later"); + OMQ_LOG(debug, "No available free workers for injected task ", task.command, "; queuing for later"); pending_commands.emplace_back(category, std::move(task.command), std::move(task.callback), std::move(task.remote)); category.queued++; return; } auto& run = get_idle_worker(); - LMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id); + OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id); run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback)); proxy_run_worker(run); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 83f5b8a..533fcb5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,7 +1,7 @@ add_subdirectory(Catch2) -set(LMQ_TEST_SRC +add_executable(tests main.cpp test_address.cpp test_batch.cpp @@ -14,9 +14,7 @@ set(LMQ_TEST_SRC test_requests.cpp test_tagged_threads.cpp test_timer.cpp - ) - -add_executable(tests ${LMQ_TEST_SRC}) +) find_package(Threads) diff --git a/tests/test_batch.cpp b/tests/test_batch.cpp index 30ed270..0461e97 100644 --- a/tests/test_batch.cpp +++ b/tests/test_batch.cpp @@ -25,7 +25,7 @@ void continue_big_task(std::vector> results) { done.set_value({sum, exc_count}); } -void start_big_task(oxenmq::OxenMQ& lmq) { +void start_big_task(oxenmq::OxenMQ& omq) { size_t num_jobs = 32; oxenmq::Batch batch; @@ -36,21 +36,21 @@ void start_big_task(oxenmq::OxenMQ& lmq) { batch.completion(&continue_big_task); - lmq.batch(std::move(batch)); + omq.batch(std::move(batch)); } TEST_CASE("batching many small jobs", "[batch-many]") { - oxenmq::OxenMQ lmq{ + oxenmq::OxenMQ omq{ "", "", // generate ephemeral keys false, // not a service node [](auto) { return ""; }, }; - lmq.set_general_threads(4); - lmq.set_batch_threads(4); - lmq.start(); + omq.set_general_threads(4); + omq.set_batch_threads(4); + omq.start(); - start_big_task(lmq); + start_big_task(omq); auto sum = done.get_future().get(); auto lock = catch_lock(); REQUIRE( sum.first == 1337.0 ); @@ -58,14 +58,14 @@ TEST_CASE("batching many small jobs", "[batch-many]") { } TEST_CASE("batch exception propagation", "[batch-exceptions]") { - oxenmq::OxenMQ lmq{ + oxenmq::OxenMQ omq{ "", "", // generate ephemeral keys false, // not a service node [](auto) { return ""; }, }; - lmq.set_general_threads(4); - lmq.set_batch_threads(4); - lmq.start(); + omq.set_general_threads(4); + omq.set_batch_threads(4); + omq.start(); std::promise done_promise; std::future done_future = done_promise.get_future(); @@ -83,7 +83,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") { REQUIRE_THROWS_MATCHES( results[1].get() == 0, std::domain_error, Message("bad value 2") ); done_promise.set_value(); }); - lmq.batch(std::move(batch)); + omq.batch(std::move(batch)); done_future.get(); } @@ -105,7 +105,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") { REQUIRE_THROWS_MATCHES( results[1].get(), std::domain_error, Message("bad value 2") ); done_promise.set_value(); }); - lmq.batch(std::move(batch)); + omq.batch(std::move(batch)); done_future.get(); } @@ -120,7 +120,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") { REQUIRE_THROWS_MATCHES( results[1].get(), std::domain_error, Message("bad value 2") ); done_promise.set_value(); }); - lmq.batch(std::move(batch)); + omq.batch(std::move(batch)); done_future.get(); } } diff --git a/tests/test_connect.cpp b/tests/test_connect.cpp index 84e20e8..95e6a12 100644 --- a/tests/test_connect.cpp +++ b/tests/test_connect.cpp @@ -244,7 +244,7 @@ TEST_CASE("unique connection IDs", "[connect][id]") { TEST_CASE("SN disconnections", "[connect][disconnect]") { - std::vector> lmq; + std::vector> omq; std::vector pubkey, privkey; std::unordered_map conn; REQUIRE(sodium_init() != -1); @@ -258,13 +258,13 @@ TEST_CASE("SN disconnections", "[connect][disconnect]") { } std::atomic his{0}; for (int i = 0; i < pubkey.size(); i++) { - lmq.push_back(std::make_unique( + omq.push_back(std::make_unique( pubkey[i], privkey[i], true, [conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; }, get_logger("S" + std::to_string(i) + "ยป "), LogLevel::trace )); - auto& server = *lmq.back(); + auto& server = *omq.back(); server.listen_curve(conn[pubkey[i]]); server.add_category("sn", Access{AuthLevel::none, true}) @@ -273,12 +273,12 @@ TEST_CASE("SN disconnections", "[connect][disconnect]") { server.start(); } - lmq[0]->send(pubkey[1], "sn.hi"); - lmq[0]->send(pubkey[2], "sn.hi"); - lmq[2]->send(pubkey[0], "sn.hi"); - lmq[2]->send(pubkey[1], "sn.hi"); - lmq[1]->send(pubkey[0], "BYE"); - lmq[0]->send(pubkey[2], "sn.hi"); + omq[0]->send(pubkey[1], "sn.hi"); + omq[0]->send(pubkey[2], "sn.hi"); + omq[2]->send(pubkey[0], "sn.hi"); + omq[2]->send(pubkey[1], "sn.hi"); + omq[1]->send(pubkey[0], "BYE"); + omq[0]->send(pubkey[2], "sn.hi"); std::this_thread::sleep_for(50ms * TIME_DILATION); auto lock = catch_lock(); diff --git a/tests/test_tagged_threads.cpp b/tests/test_tagged_threads.cpp index 7ad0196..708305c 100644 --- a/tests/test_tagged_threads.cpp +++ b/tests/test_tagged_threads.cpp @@ -3,13 +3,13 @@ #include TEST_CASE("tagged thread start functions", "[tagged][start]") { - oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace}; + oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace}; - lmq.set_general_threads(2); - lmq.set_batch_threads(2); - auto t_abc = lmq.add_tagged_thread("abc"); + omq.set_general_threads(2); + omq.set_batch_threads(2); + auto t_abc = omq.add_tagged_thread("abc"); std::atomic start_called = false; - auto t_def = lmq.add_tagged_thread("def", [&] { start_called = true; }); + auto t_def = omq.add_tagged_thread("def", [&] { start_called = true; }); std::this_thread::sleep_for(20ms); { @@ -17,7 +17,7 @@ TEST_CASE("tagged thread start functions", "[tagged][start]") { REQUIRE_FALSE( start_called ); } - lmq.start(); + omq.start(); wait_for([&] { return start_called.load(); }); { auto lock = catch_lock(); @@ -26,24 +26,24 @@ TEST_CASE("tagged thread start functions", "[tagged][start]") { } TEST_CASE("tagged threads quit-before-start", "[tagged][quit]") { - auto lmq = std::make_unique(get_logger(""), LogLevel::trace); - auto t_abc = lmq->add_tagged_thread("abc"); - REQUIRE_NOTHROW(lmq.reset()); + auto omq = std::make_unique(get_logger(""), LogLevel::trace); + auto t_abc = omq->add_tagged_thread("abc"); + REQUIRE_NOTHROW(omq.reset()); } TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { - oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace}; + oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace}; - lmq.set_general_threads(2); - lmq.set_batch_threads(2); + omq.set_general_threads(2); + omq.set_batch_threads(2); std::thread::id id_abc, id_def; - auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); }); - auto t_def = lmq.add_tagged_thread("def", [&] { id_def = std::this_thread::get_id(); }); - lmq.start(); + auto t_abc = omq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); }); + auto t_def = omq.add_tagged_thread("def", [&] { id_def = std::this_thread::get_id(); }); + omq.start(); std::atomic done = false; std::thread::id id; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }); + omq.job([&] { id = std::this_thread::get_id(); done = true; }); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -52,7 +52,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); + omq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -60,7 +60,7 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_def); + omq.job([&] { id = std::this_thread::get_id(); done = true; }, t_def); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -69,16 +69,16 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { std::atomic sleep = true; auto sleeper = [&] { for (int i = 0; sleep && i < 10; i++) { std::this_thread::sleep_for(25ms); } }; - lmq.job(sleeper); - lmq.job(sleeper); + omq.job(sleeper); + omq.job(sleeper); // This one should stall: std::atomic bad = false; - lmq.job([&] { bad = true; }); + omq.job([&] { bad = true; }); std::this_thread::sleep_for(50ms); done = false; - lmq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); + omq.job([&] { id = std::this_thread::get_id(); done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -90,9 +90,9 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { // We can queue up a bunch of jobs which should all happen in order, and all on the abc thread. std::vector v; for (int i = 0; i < 100; i++) { - lmq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, t_abc); + omq.job([&] { if (std::this_thread::get_id() == id_abc) v.push_back(v.size()); }, t_abc); } - lmq.job([&] { done = true; }, t_abc); + omq.job([&] { done = true; }, t_abc); wait_for([&] { return done.load(); }); { auto lock = catch_lock(); @@ -111,13 +111,13 @@ TEST_CASE("batch jobs to tagged threads", "[tagged][batch]") { } TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]") { - oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace}; + oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace}; - lmq.set_general_threads(4); - lmq.set_batch_threads(4); + omq.set_general_threads(4); + omq.set_batch_threads(4); std::thread::id id_abc; - auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); }); - lmq.start(); + auto t_abc = omq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); }); + omq.start(); oxenmq::Batch batch; for (int i = 1; i < 10; i++) @@ -130,7 +130,7 @@ TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]" sum += r.get(); result_sum = std::this_thread::get_id() == id_abc ? sum : -sum; }, t_abc); - lmq.batch(std::move(batch)); + omq.batch(std::move(batch)); wait_for([&] { return result_sum.load() != -1; }); { auto lock = catch_lock(); @@ -140,19 +140,19 @@ TEST_CASE("batch job completion on tagged threads", "[tagged][batch-completion]" TEST_CASE("timer job completion on tagged threads", "[tagged][timer]") { - oxenmq::OxenMQ lmq{get_logger(""), LogLevel::trace}; + oxenmq::OxenMQ omq{get_logger(""), LogLevel::trace}; - lmq.set_general_threads(4); - lmq.set_batch_threads(4); + omq.set_general_threads(4); + omq.set_batch_threads(4); std::thread::id id_abc; - auto t_abc = lmq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); }); - lmq.start(); + auto t_abc = omq.add_tagged_thread("abc", [&] { id_abc = std::this_thread::get_id(); }); + omq.start(); std::atomic ticks = 0; std::atomic abc_ticks = 0; - lmq.add_timer([&] { ticks++; }, 10ms); - lmq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, t_abc); + omq.add_timer([&] { ticks++; }, 10ms); + omq.add_timer([&] { if (std::this_thread::get_id() == id_abc) abc_ticks++; }, 10ms, true, t_abc); wait_for([&] { return ticks.load() > 2 && abc_ticks > 2; }); {