From 5c7f6504d25cd99ad8cc0a39ab3099ecc0cfa27b Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:15:30 -0300 Subject: [PATCH 01/10] Fix cmake compilation properties For some reason using target_compile_features doesn't properly set up C++17 flags in the generate compile_commands.json, which then breaks clang-complete. Switch to use properties instead, which works. --- CMakeLists.txt | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 41fdebc..b3c0c03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -140,8 +140,12 @@ if(WARNINGS_AS_ERRORS) target_compile_options(oxenmq PRIVATE -Werror) endif() -target_compile_features(oxenmq PUBLIC cxx_std_17) -set_target_properties(oxenmq PROPERTIES POSITION_INDEPENDENT_CODE ON) +set_target_properties(oxenmq PROPERTIES + POSITION_INDEPENDENT_CODE ON + CXX_STANDARD 17 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) function(link_dep_libs target linktype libdirs) foreach(lib ${ARGN}) From 3a5171339678cf24600d923807a2f4a8c695b660 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:20:51 -0300 Subject: [PATCH 02/10] Add simpler Job subclass of Batch for simple jobs This adds a much simpler `Job` implementation of `Batch` that is used for simple no-return, no-completion jobs (as are initiated via `omq.job(...)`). This reduces the overhead involved in constructing/destroying the Batch instance for these common jobs. --- oxenmq/batch.h | 46 ++++++++++++++++++++++++++++++++++++++++++++++ oxenmq/jobs.cpp | 22 ++++++++++++---------- oxenmq/oxenmq.h | 1 + 3 files changed, 59 insertions(+), 10 deletions(-) diff --git a/oxenmq/batch.h b/oxenmq/batch.h index 5fbdf18..6c313fc 100644 --- a/oxenmq/batch.h +++ b/oxenmq/batch.h @@ -266,6 +266,52 @@ private: } }; +// Similar to Batch, but doesn't support a completion function and only handles a single task. +class Job final : private detail::Batch { + friend class OxenMQ; +public: + /// Constructs the Job to run a single task. Takes any callable invokable with no arguments and + /// having no return value. The task will be scheduled and run when the next worker thread is + /// available. Any exceptions thrown by the job will be caught and squelched (the exception + /// terminates/completes the job). + + explicit Job(std::function f, std::optional thread = std::nullopt) + : Job{std::move(f), thread ? thread->_id : 0} + { + if (thread && thread->_id == -1) + // There are some special case internal jobs where we allow this, but they use the + // private ctor below that doesn't have this check. + throw std::logic_error{"Cannot add a proxy thread job -- this makes no sense"}; + } + + // movable + Job(Job&&) = default; + Job &operator=(Job&&) = default; + + // non-copyable + Job(const Job&) = delete; + Job &operator=(const Job&) = delete; + +private: + explicit Job(std::function f, int thread_id) + : job{std::move(f), thread_id} {} + + std::pair, int> job; + bool done = false; + + std::pair size() const override { return {1, job.second != 0}; } + std::vector threads() const override { return {job.second}; } + + void run_job(const int /*i*/) override { + try { job.first(); } + catch (...) {} + } + + detail::BatchStatus job_finished() override { return {detail::BatchState::done, 0}; } + + void job_completion() override {} // Never called because we return ::done (not ::complete) above. + +}; template void OxenMQ::batch(Batch&& batch) { diff --git a/oxenmq/jobs.cpp b/oxenmq/jobs.cpp index ca38f54..683055f 100644 --- a/oxenmq/jobs.cpp +++ b/oxenmq/jobs.cpp @@ -29,17 +29,15 @@ void OxenMQ::proxy_batch(detail::Batch* batch) { void OxenMQ::job(std::function f, std::optional thread) { if (thread && thread->_id == -1) throw std::logic_error{"job() cannot be used to queue an in-proxy job"}; - auto* b = new Batch; - b->add_job(std::move(f), thread); - auto* baseptr = static_cast(b); + auto* j = new Job(std::move(f), thread); + auto* baseptr = static_cast(j); detail::send_control(get_control_socket(), "BATCH", oxenc::bt_serialize(reinterpret_cast(baseptr))); } void OxenMQ::proxy_schedule_reply_job(std::function f) { - auto* b = new Batch; - b->add_job(std::move(f)); - batches.insert(b); - reply_jobs.emplace(static_cast(b), 0); + auto* j = new Job(std::move(f)); + reply_jobs.emplace_back(static_cast(j), 0); + batches.insert(j); proxy_skip_one_poll = true; } @@ -98,11 +96,12 @@ void OxenMQ::_queue_timer_job(int timer_id) { return; } - auto* b = new Batch; - b->add_job(func, thread); + detail::Batch* b; if (squelch) { + auto* bv = new Batch; + bv->add_job(func, thread); running = true; - b->completion([this,timer_id](auto results) { + bv->completion([this,timer_id](auto results) { try { results[0].get(); } catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } @@ -110,6 +109,9 @@ void OxenMQ::_queue_timer_job(int timer_id) { if (it != timer_jobs.end()) it->second.running = false; }, OxenMQ::run_in_proxy); + b = bv; + } else { + b = new Job(func, thread); } batches.insert(b); OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 3ed1e44..4547f43 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -106,6 +106,7 @@ private: explicit constexpr TaggedThreadID(int id) : _id{id} {} friend class OxenMQ; template friend class Batch; + friend class Job; }; /// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only From 371606cde0136c248d2eaf411b60d4a4a7520317 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:25:46 -0300 Subject: [PATCH 03/10] Eliminate useless unordered_set I don't know what this set was originally meant to be doing, but it currently does nothing (except adding overhead). The comment says it "owns" the instances but that isn't really true; the instances effectively already manage themselves as they pass the pointer through the communications between proxy and workers. --- oxenmq/jobs.cpp | 3 --- oxenmq/oxenmq.h | 2 -- oxenmq/worker.cpp | 1 - 3 files changed, 6 deletions(-) diff --git a/oxenmq/jobs.cpp b/oxenmq/jobs.cpp index 683055f..b3065ee 100644 --- a/oxenmq/jobs.cpp +++ b/oxenmq/jobs.cpp @@ -5,7 +5,6 @@ namespace oxenmq { void OxenMQ::proxy_batch(detail::Batch* batch) { - batches.insert(batch); const auto [jobs, tagged_threads] = batch->size(); OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : ""); if (!tagged_threads) { @@ -37,7 +36,6 @@ void OxenMQ::job(std::function f, std::optional thread) void OxenMQ::proxy_schedule_reply_job(std::function f) { auto* j = new Job(std::move(f)); reply_jobs.emplace_back(static_cast(j), 0); - batches.insert(j); proxy_skip_one_poll = true; } @@ -113,7 +111,6 @@ void OxenMQ::_queue_timer_job(int timer_id) { } else { b = new Job(func, thread); } - batches.insert(b); OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); assert(b->size() == std::make_pair(size_t{1}, thread > 0)); auto& queue = thread > 0 diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 4547f43..1b47c33 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -576,8 +576,6 @@ private: /// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open). void proxy_reply(oxenc::bt_dict_consumer data); - /// Currently active batch/reply jobs; this is the container that owns the Batch instances - std::unordered_set batches; /// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the /// completion job using batch_job = std::pair; diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index 222e68a..abab766 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -247,7 +247,6 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { } if (clear_job) { - batches.erase(batch); delete batch; run.to_run = static_cast(nullptr); } From fa6de369b2d02eb1ec764d93bc59ba06756e13a6 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:32:17 -0300 Subject: [PATCH 04/10] Change std::queue to std::deque typedef This shouldn't make any difference with an optimizing compiler, but makes it easier a bit easier to experiment with different data structures. --- oxenmq/jobs.cpp | 14 +++++++------- oxenmq/oxenmq.h | 7 ++++--- oxenmq/proxy.cpp | 2 +- oxenmq/worker.cpp | 4 ++-- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/oxenmq/jobs.cpp b/oxenmq/jobs.cpp index b3065ee..3e056e5 100644 --- a/oxenmq/jobs.cpp +++ b/oxenmq/jobs.cpp @@ -9,16 +9,16 @@ void OxenMQ::proxy_batch(detail::Batch* batch) { OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : ""); if (!tagged_threads) { for (size_t i = 0; i < jobs; i++) - batch_jobs.emplace(batch, i); + batch_jobs.emplace_back(batch, i); } else { // Some (or all) jobs have a specific thread target so queue any such jobs in the tagged // worker queue. auto threads = batch->threads(); for (size_t i = 0; i < jobs; i++) { auto& jobs = threads[i] > 0 - ? std::get>(tagged_workers[threads[i] - 1]) + ? std::get(tagged_workers[threads[i] - 1]) : batch_jobs; - jobs.emplace(batch, i); + jobs.emplace_back(batch, i); } } @@ -39,11 +39,11 @@ void OxenMQ::proxy_schedule_reply_job(std::function f) { proxy_skip_one_poll = true; } -void OxenMQ::proxy_run_batch_jobs(std::queue& jobs, const int reserved, int& active, bool reply) { +void OxenMQ::proxy_run_batch_jobs(batch_queue& jobs, const int reserved, int& active, bool reply) { while (!jobs.empty() && active_workers() < max_workers && (active < reserved || active_workers() < general_workers)) { proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply)); - jobs.pop(); + jobs.pop_front(); active++; } } @@ -114,9 +114,9 @@ void OxenMQ::_queue_timer_job(int timer_id) { OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); assert(b->size() == std::make_pair(size_t{1}, thread > 0)); auto& queue = thread > 0 - ? std::get>(tagged_workers[thread - 1]) + ? std::get(tagged_workers[thread - 1]) : batch_jobs; - queue.emplace(static_cast(b), 0); + queue.emplace_back(static_cast(b), 0); } void OxenMQ::add_timer(TimerID& timer, std::function job, std::chrono::milliseconds interval, bool squelch, std::optional thread) { diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 1b47c33..007d2e2 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -579,13 +579,14 @@ private: /// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the /// completion job using batch_job = std::pair; - std::queue batch_jobs, reply_jobs; + using batch_queue = std::deque; + batch_queue batch_jobs, reply_jobs; int batch_jobs_active = 0; int reply_jobs_active = 0; int batch_jobs_reserved = -1; int reply_jobs_reserved = -1; /// Runs any queued batch jobs - void proxy_run_batch_jobs(std::queue& jobs, int reserved, int& active, bool reply); + void proxy_run_batch_jobs(batch_queue& jobs, int reserved, int& active, bool reply); /// BATCH command. Called with a Batch (see oxenmq/batch.h) object pointer for the proxy to /// take over and queue batch jobs. @@ -768,7 +769,7 @@ private: /// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The /// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool /// is whether the worker is currently busy (true) or available (false). - std::vector>> tagged_workers; + std::vector> tagged_workers; public: /** diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 3e63db2..6c08d49 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -731,7 +731,7 @@ void OxenMQ::proxy_process_queue() { if (!busy && !queue.empty()) { busy = true; proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id)); - queue.pop(); + queue.pop_front(); } } diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index abab766..168052e 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -233,11 +233,11 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { } else { auto& jobs = thread > 0 - ? std::get>(tagged_workers[thread - 1]) // run in tagged thread + ? std::get(tagged_workers[thread - 1]) // run in tagged thread : run.is_reply_job ? reply_jobs : batch_jobs; - jobs.emplace(batch, -1); + jobs.emplace_back(batch, -1); } } else if (state == detail::BatchState::done) { // No completion job From b8e4eb148f7b414c69b5bfbd1de280ef74e7e5f0 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:36:33 -0300 Subject: [PATCH 05/10] Use raw index bytes in worker router Change the internal worker routing id to be "w" followed by the raw integer bytes, so that we can just memcpy them into a uint32_t rather than needing to do str -> integer conversion on each received worker message. (This also eliminates a vestigal call into oxenc internals). --- oxenmq/jobs.cpp | 5 +++-- oxenmq/oxenmq.h | 5 +++-- oxenmq/proxy.cpp | 2 +- oxenmq/worker.cpp | 35 ++++++++++++++++++----------------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/oxenmq/jobs.cpp b/oxenmq/jobs.cpp index 3e056e5..b98993b 100644 --- a/oxenmq/jobs.cpp +++ b/oxenmq/jobs.cpp @@ -170,8 +170,9 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function auto& [run, busy, queue] = tagged_workers.emplace_back(); busy = false; run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs) - run.worker_routing_id = "t" + std::to_string(run.worker_id); - OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id); + run.worker_routing_name = "t" + std::to_string(run.worker_id); + run.worker_routing_id = "t" + std::string{reinterpret_cast(&run.worker_id), sizeof(run.worker_id)}; + OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_name); run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)}; diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 007d2e2..dc0a227 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -744,8 +744,9 @@ private: // These belong to the proxy thread and must not be accessed by a worker: std::thread worker_thread; - size_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n) - std::string worker_routing_id; // "w123" where 123 == worker_id; "n123" for tagged threads. + uint32_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n) + std::string worker_routing_id; // "wXXXX" where XXXX is the raw bytes of worker_id, or tXXXX for tagged threads. + std::string worker_routing_name; // "w123" or "t123" -- human readable version of worker_routing_id /// Loads the run info with an incoming command run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote, diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 6c08d49..cacbacd 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -482,7 +482,7 @@ void OxenMQ::proxy_loop_init() { } for (auto&w : tagged_workers) { - OMQ_LOG(debug, "Telling tagged thread worker ", std::get(w).worker_routing_id, " to finish startup"); + OMQ_LOG(debug, "Telling tagged thread worker ", std::get(w).worker_routing_name, " to finish startup"); route_control(workers_socket, std::get(w).worker_routing_id, "START"); } } diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index 168052e..b506736 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -48,10 +48,11 @@ bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector tagged, std::function start) { - std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing - std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug + std::string routing_id = (tagged ? "t" : "w") + + std::string(reinterpret_cast(&index), sizeof(index)); // for routing + std::string worker_id{tagged ? *tagged : "w" + std::to_string(index)}; // for debug - [[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id); + [[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + worker_id); #if defined(__linux__) || defined(__sun) || defined(__MINGW32__) if (thread_name.size() > 15) thread_name.resize(15); pthread_setname_np(pthread_self(), thread_name.c_str()); @@ -167,7 +168,8 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() { workers.emplace_back(); auto& r = workers.back(); r.worker_id = id; - r.worker_routing_id = "w" + std::to_string(id); + r.worker_routing_id = "w" + std::string(reinterpret_cast(&id), sizeof(id)); + r.worker_routing_name = "w" + std::to_string(id); return r; } size_t id = idle_workers.back(); @@ -182,18 +184,17 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { return; } auto route = view(parts[0]), cmd = view(parts[1]); - OMQ_TRACE("worker message from ", route); - assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9'); + if (route.size() != 5 || (route[0] != 'w' && route[0] != 't')) { + OMQ_LOG(error, "Received malformed worker id in worker message; unable to process worker command"); + return; + } bool tagged_worker = route[0] == 't'; - std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t") - unsigned int worker_id = oxenc::detail::extract_unsigned(worker_id_str); - if (!worker_id_str.empty() /* didn't consume everything */ || - (tagged_worker - ? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged) - : worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1 - ) - ) { - OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command"); + uint32_t worker_id; + std::memcpy(&worker_id, route.data() + 1, 4); + if (tagged_worker + ? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged) + : worker_id >= workers.size()) { // regular worker ids are indexed from 0 to N-1 + OMQ_LOG(error, "Received invalid worker id w" + std::to_string(worker_id) + " in worker message; unable to process worker command"); return; } @@ -380,7 +381,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectoractivity(); // outgoing connection activity, pump the activity timer OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]), - " to worker ", run.worker_routing_id); + " to worker ", run.worker_routing_name); proxy_run_worker(run); category.active_threads++; @@ -411,7 +412,7 @@ void OxenMQ::proxy_inject_task(injected_task task) { } auto& run = get_idle_worker(); - OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id); + OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_name); run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback)); proxy_run_worker(run); From 45791d3a191f595943d278cb3ee43c90acdd0958 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:42:08 -0300 Subject: [PATCH 06/10] Use fixed array for known-small internal messages Internal messages (control messages, worker messages) are always 3 parts or less, so we can optimize by using a stack allocated std::array for those cases rather than needing to continually clear and expand a heap allocated vector. --- oxenmq/oxenmq-internal.h | 15 +++++++++++++++ oxenmq/oxenmq.h | 6 ++++-- oxenmq/proxy.cpp | 22 +++++++++++++--------- oxenmq/worker.cpp | 6 +++--- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/oxenmq/oxenmq-internal.h b/oxenmq/oxenmq-internal.h index 420c691..32f5596 100644 --- a/oxenmq/oxenmq-internal.h +++ b/oxenmq/oxenmq-internal.h @@ -86,6 +86,21 @@ inline bool recv_message_parts(zmq::socket_t& sock, std::vector& return true; } +// Same as above, but using a fixed sized array; this is only used for internal jobs (e.g. control +// messages) where we know the message parts should never exceed a given size (this function does +// not bounds check except in debug builds). Returns the number of message parts received, or 0 on +// read error. +template +inline size_t recv_message_parts(zmq::socket_t& sock, std::array& parts, const zmq::recv_flags flags = zmq::recv_flags::none) { + for (size_t count = 0; ; count++) { + assert(count < N); + if (!sock.recv(parts[count], flags)) + return 0; + if (!parts[count].more()) + return count + 1; + } +} + inline const char* peer_address(zmq::message_t& msg) { try { return msg.gets("Peer-Address"); } catch (...) {} return "(unknown)"; diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index dc0a227..315c217 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -484,7 +484,9 @@ private: void proxy_conn_cleanup(); - void proxy_worker_message(std::vector& parts); + using control_message_array = std::array; + + void proxy_worker_message(control_message_array& parts, size_t len); void proxy_process_queue(); @@ -608,7 +610,7 @@ private: void process_zap_requests(); /// Handles a control message from some outer thread to the proxy - void proxy_control_message(std::vector& parts); + void proxy_control_message(control_message_array& parts, size_t len); /// Closing any idle connections that have outlived their idle time. Note that this only /// affects outgoing connections; incomings connections are the responsibility of the other end. diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index cacbacd..1ef17de 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -271,14 +271,14 @@ void OxenMQ::proxy_reply(oxenc::bt_dict_consumer data) { } } -void OxenMQ::proxy_control_message(std::vector& parts) { +void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t len) { // We throw an uncaught exception here because we only generate control messages internally in // oxenmq code: if one of these condition fail it's a oxenmq bug. - if (parts.size() < 2) + if (len < 2) throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message"); auto route = view(parts[0]), cmd = view(parts[1]); OMQ_TRACE("control message: ", cmd); - if (parts.size() == 3) { + if (len == 3) { OMQ_TRACE("...: ", parts[2]); auto data = view(parts[2]); if (cmd == "SEND") { @@ -315,7 +315,7 @@ void OxenMQ::proxy_control_message(std::vector& parts) { bind.push_back(std::move(b)); return; } - } else if (parts.size() == 2) { + } else if (len == 2) { if (cmd == "START") { // Command send by the owning thread during startup; we send back a simple READY reply to // let it know we are running. @@ -335,7 +335,7 @@ void OxenMQ::proxy_control_message(std::vector& parts) { } } throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " + - std::string{cmd} + " (" + std::to_string(parts.size()) + ")"); + std::string{cmd} + " (" + std::to_string(len) + ")"); } bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) { @@ -497,6 +497,10 @@ void OxenMQ::proxy_loop(std::promise startup) { } startup.set_value(); + // Fixed array used for worker and control messages: these are never longer than 3 parts: + std::array control_parts; + + // General vector for handling incoming messages: std::vector parts; while (true) { @@ -529,13 +533,13 @@ void OxenMQ::proxy_loop(std::promise startup) { OMQ_TRACE("processing control messages"); // Retrieve any waiting incoming control messages - for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) { - proxy_control_message(parts); + while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) { + proxy_control_message(control_parts, len); } OMQ_TRACE("processing worker messages"); - for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) { - proxy_worker_message(parts); + while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) { + proxy_worker_message(control_parts, len); } OMQ_TRACE("processing timers"); diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index b506736..111e049 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -177,10 +177,10 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() { return workers[id]; } -void OxenMQ::proxy_worker_message(std::vector& parts) { +void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t len) { // Process messages sent by workers - if (parts.size() != 2) { - OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message"); + if (len != 2) { + OMQ_LOG(error, "Received send invalid ", len, "-part message"); return; } auto route = view(parts[0]), cmd = view(parts[1]); From d86ecb3a70ef76b1344f88ac4226f9ad7f90b93f Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:44:54 -0300 Subject: [PATCH 07/10] Use fixed vector for idle workers Use a count + fixed size vector with a separate variable tracking the size seems to perform slightly better than popping/pushing the vector. --- oxenmq/oxenmq.h | 5 +++-- oxenmq/proxy.cpp | 7 ++++--- oxenmq/worker.cpp | 10 ++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 315c217..408ba9f 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -456,8 +456,9 @@ private: /// Router socket to reach internal worker threads from proxy zmq::socket_t workers_socket{context, zmq::socket_type::router}; - /// indices of idle, active workers + /// indices of idle, active workers; note that this vector is usually oversized std::vector idle_workers; + size_t idle_worker_count = 0; // Actual # elements of idle_workers in use /// Maximum number of general task workers, specified by set_general_threads() int general_workers = std::max(1, std::thread::hardware_concurrency()); @@ -469,7 +470,7 @@ private: int max_workers; /// Number of active workers - int active_workers() const { return workers.size() - idle_workers.size(); } + int active_workers() const { return workers.size() - idle_worker_count; } /// Worker thread loop. Tagged and start are provided for a tagged worker thread. void worker_thread(unsigned int index, std::optional tagged = std::nullopt, std::function start = nullptr); diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 1ef17de..28275c2 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -325,9 +325,9 @@ void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t // close workers as they come back to READY status, and then close external // connections once all workers are done. max_workers = 0; - for (const auto &route : idle_workers) - route_control(workers_socket, workers[route].worker_routing_id, "QUIT"); - idle_workers.clear(); + for (size_t i = 0; i < idle_worker_count; i++) + route_control(workers_socket, workers[idle_workers[i]].worker_routing_id, "QUIT"); + idle_worker_count = 0; for (auto& [run, busy, queue] : tagged_workers) if (!busy) route_control(workers_socket, run.worker_routing_id, "QUIT"); @@ -404,6 +404,7 @@ void OxenMQ::proxy_loop_init() { } workers.reserve(max_workers); + idle_workers.resize(max_workers); if (!workers.empty()) throw std::logic_error("Internal error: proxy thread started with active worker threads"); diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index 111e049..9a6a7bf 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -162,9 +162,8 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged OxenMQ::run_info& OxenMQ::get_idle_worker() { - if (idle_workers.empty()) { - size_t id = workers.size(); - assert(workers.capacity() > id); + if (idle_worker_count == 0) { + uint32_t id = workers.size(); workers.emplace_back(); auto& r = workers.back(); r.worker_id = id; @@ -172,8 +171,7 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() { r.worker_routing_name = "w" + std::to_string(id); return r; } - size_t id = idle_workers.back(); - idle_workers.pop_back(); + size_t id = idle_workers[--idle_worker_count]; return workers[id]; } @@ -259,7 +257,7 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l OMQ_TRACE("Telling worker ", route, " to quit"); route_control(workers_socket, route, "QUIT"); } else if (!tagged_worker) { - idle_workers.push_back(worker_id); + idle_workers[idle_worker_count++] = worker_id; } } else if (cmd == "QUITTING"sv) { run.worker_thread.join(); From 62a803f371ed1da473fc700a0ea0e5ad36b7539b Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:48:15 -0300 Subject: [PATCH 08/10] Add missing header This was surely coming in implicitly already, but better to be explicit. --- oxenmq/oxenmq.h | 1 + 1 file changed, 1 insertion(+) diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 408ba9f..996cd24 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -45,6 +45,7 @@ #include #include #include +#include #include "zmq.hpp" #include "address.h" #include From ace6ea9d8e4c6bbe7cefa5c661e013a2ba4b8639 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 12 May 2022 12:48:46 -0300 Subject: [PATCH 09/10] Avoid unnecessary nullptr assignment We can just leave the dangling pointer value in the `run` object: even though we just deleted it, there's no need to reset this value because it will never be used again. (And even if we did, we don't check against nullptr anyway so having a nullptr here doesn't make anything safter than a dangling pointer). The assignment (into the variant) uses a small amount of CPU (via std::variant), so better for performance to just leave it dangling. --- oxenmq/worker.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index 9a6a7bf..d10e938 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -247,7 +247,6 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l if (clear_job) { delete batch; - run.to_run = static_cast(nullptr); } } else { assert(run.cat->active_threads > 0); From 115c5550ca78c65a658770d134faa400413da63d Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 24 May 2022 16:15:39 -0300 Subject: [PATCH 10/10] Bump version & embedded oxenc version --- CMakeLists.txt | 2 +- oxen-encoding | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b3c0c03..b214872 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.11 + VERSION 1.2.12 LANGUAGES CXX C) include(GNUInstallDirs) diff --git a/oxen-encoding b/oxen-encoding index a0912ab..79193e5 160000 --- a/oxen-encoding +++ b/oxen-encoding @@ -1 +1 @@ -Subproject commit a0912ab4bf3b5e83b42715eff6f632c8912b21e4 +Subproject commit 79193e58fb26624d40cd2e95156f78160f2b9b3e