diff --git a/CMakeLists.txt b/CMakeLists.txt index 41fdebc..b214872 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.11 + VERSION 1.2.12 LANGUAGES CXX C) include(GNUInstallDirs) @@ -140,8 +140,12 @@ if(WARNINGS_AS_ERRORS) target_compile_options(oxenmq PRIVATE -Werror) endif() -target_compile_features(oxenmq PUBLIC cxx_std_17) -set_target_properties(oxenmq PROPERTIES POSITION_INDEPENDENT_CODE ON) +set_target_properties(oxenmq PROPERTIES + POSITION_INDEPENDENT_CODE ON + CXX_STANDARD 17 + CXX_STANDARD_REQUIRED ON + CXX_EXTENSIONS OFF +) function(link_dep_libs target linktype libdirs) foreach(lib ${ARGN}) diff --git a/oxen-encoding b/oxen-encoding index a0912ab..79193e5 160000 --- a/oxen-encoding +++ b/oxen-encoding @@ -1 +1 @@ -Subproject commit a0912ab4bf3b5e83b42715eff6f632c8912b21e4 +Subproject commit 79193e58fb26624d40cd2e95156f78160f2b9b3e diff --git a/oxenmq/batch.h b/oxenmq/batch.h index 5fbdf18..6c313fc 100644 --- a/oxenmq/batch.h +++ b/oxenmq/batch.h @@ -266,6 +266,52 @@ private: } }; +// Similar to Batch, but doesn't support a completion function and only handles a single task. +class Job final : private detail::Batch { + friend class OxenMQ; +public: + /// Constructs the Job to run a single task. Takes any callable invokable with no arguments and + /// having no return value. The task will be scheduled and run when the next worker thread is + /// available. Any exceptions thrown by the job will be caught and squelched (the exception + /// terminates/completes the job). + + explicit Job(std::function f, std::optional thread = std::nullopt) + : Job{std::move(f), thread ? thread->_id : 0} + { + if (thread && thread->_id == -1) + // There are some special case internal jobs where we allow this, but they use the + // private ctor below that doesn't have this check. + throw std::logic_error{"Cannot add a proxy thread job -- this makes no sense"}; + } + + // movable + Job(Job&&) = default; + Job &operator=(Job&&) = default; + + // non-copyable + Job(const Job&) = delete; + Job &operator=(const Job&) = delete; + +private: + explicit Job(std::function f, int thread_id) + : job{std::move(f), thread_id} {} + + std::pair, int> job; + bool done = false; + + std::pair size() const override { return {1, job.second != 0}; } + std::vector threads() const override { return {job.second}; } + + void run_job(const int /*i*/) override { + try { job.first(); } + catch (...) {} + } + + detail::BatchStatus job_finished() override { return {detail::BatchState::done, 0}; } + + void job_completion() override {} // Never called because we return ::done (not ::complete) above. + +}; template void OxenMQ::batch(Batch&& batch) { diff --git a/oxenmq/jobs.cpp b/oxenmq/jobs.cpp index ca38f54..b98993b 100644 --- a/oxenmq/jobs.cpp +++ b/oxenmq/jobs.cpp @@ -5,21 +5,20 @@ namespace oxenmq { void OxenMQ::proxy_batch(detail::Batch* batch) { - batches.insert(batch); const auto [jobs, tagged_threads] = batch->size(); OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : ""); if (!tagged_threads) { for (size_t i = 0; i < jobs; i++) - batch_jobs.emplace(batch, i); + batch_jobs.emplace_back(batch, i); } else { // Some (or all) jobs have a specific thread target so queue any such jobs in the tagged // worker queue. auto threads = batch->threads(); for (size_t i = 0; i < jobs; i++) { auto& jobs = threads[i] > 0 - ? std::get>(tagged_workers[threads[i] - 1]) + ? std::get(tagged_workers[threads[i] - 1]) : batch_jobs; - jobs.emplace(batch, i); + jobs.emplace_back(batch, i); } } @@ -29,25 +28,22 @@ void OxenMQ::proxy_batch(detail::Batch* batch) { void OxenMQ::job(std::function f, std::optional thread) { if (thread && thread->_id == -1) throw std::logic_error{"job() cannot be used to queue an in-proxy job"}; - auto* b = new Batch; - b->add_job(std::move(f), thread); - auto* baseptr = static_cast(b); + auto* j = new Job(std::move(f), thread); + auto* baseptr = static_cast(j); detail::send_control(get_control_socket(), "BATCH", oxenc::bt_serialize(reinterpret_cast(baseptr))); } void OxenMQ::proxy_schedule_reply_job(std::function f) { - auto* b = new Batch; - b->add_job(std::move(f)); - batches.insert(b); - reply_jobs.emplace(static_cast(b), 0); + auto* j = new Job(std::move(f)); + reply_jobs.emplace_back(static_cast(j), 0); proxy_skip_one_poll = true; } -void OxenMQ::proxy_run_batch_jobs(std::queue& jobs, const int reserved, int& active, bool reply) { +void OxenMQ::proxy_run_batch_jobs(batch_queue& jobs, const int reserved, int& active, bool reply) { while (!jobs.empty() && active_workers() < max_workers && (active < reserved || active_workers() < general_workers)) { proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply)); - jobs.pop(); + jobs.pop_front(); active++; } } @@ -98,11 +94,12 @@ void OxenMQ::_queue_timer_job(int timer_id) { return; } - auto* b = new Batch; - b->add_job(func, thread); + detail::Batch* b; if (squelch) { + auto* bv = new Batch; + bv->add_job(func, thread); running = true; - b->completion([this,timer_id](auto results) { + bv->completion([this,timer_id](auto results) { try { results[0].get(); } catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); } catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); } @@ -110,14 +107,16 @@ void OxenMQ::_queue_timer_job(int timer_id) { if (it != timer_jobs.end()) it->second.running = false; }, OxenMQ::run_in_proxy); + b = bv; + } else { + b = new Job(func, thread); } - batches.insert(b); OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread); assert(b->size() == std::make_pair(size_t{1}, thread > 0)); auto& queue = thread > 0 - ? std::get>(tagged_workers[thread - 1]) + ? std::get(tagged_workers[thread - 1]) : batch_jobs; - queue.emplace(static_cast(b), 0); + queue.emplace_back(static_cast(b), 0); } void OxenMQ::add_timer(TimerID& timer, std::function job, std::chrono::milliseconds interval, bool squelch, std::optional thread) { @@ -171,8 +170,9 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function auto& [run, busy, queue] = tagged_workers.emplace_back(); busy = false; run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs) - run.worker_routing_id = "t" + std::to_string(run.worker_id); - OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id); + run.worker_routing_name = "t" + std::to_string(run.worker_id); + run.worker_routing_id = "t" + std::string{reinterpret_cast(&run.worker_id), sizeof(run.worker_id)}; + OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_name); run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)}; diff --git a/oxenmq/oxenmq-internal.h b/oxenmq/oxenmq-internal.h index 420c691..32f5596 100644 --- a/oxenmq/oxenmq-internal.h +++ b/oxenmq/oxenmq-internal.h @@ -86,6 +86,21 @@ inline bool recv_message_parts(zmq::socket_t& sock, std::vector& return true; } +// Same as above, but using a fixed sized array; this is only used for internal jobs (e.g. control +// messages) where we know the message parts should never exceed a given size (this function does +// not bounds check except in debug builds). Returns the number of message parts received, or 0 on +// read error. +template +inline size_t recv_message_parts(zmq::socket_t& sock, std::array& parts, const zmq::recv_flags flags = zmq::recv_flags::none) { + for (size_t count = 0; ; count++) { + assert(count < N); + if (!sock.recv(parts[count], flags)) + return 0; + if (!parts[count].more()) + return count + 1; + } +} + inline const char* peer_address(zmq::message_t& msg) { try { return msg.gets("Peer-Address"); } catch (...) {} return "(unknown)"; diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 3ed1e44..996cd24 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -45,6 +45,7 @@ #include #include #include +#include #include "zmq.hpp" #include "address.h" #include @@ -106,6 +107,7 @@ private: explicit constexpr TaggedThreadID(int id) : _id{id} {} friend class OxenMQ; template friend class Batch; + friend class Job; }; /// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only @@ -455,8 +457,9 @@ private: /// Router socket to reach internal worker threads from proxy zmq::socket_t workers_socket{context, zmq::socket_type::router}; - /// indices of idle, active workers + /// indices of idle, active workers; note that this vector is usually oversized std::vector idle_workers; + size_t idle_worker_count = 0; // Actual # elements of idle_workers in use /// Maximum number of general task workers, specified by set_general_threads() int general_workers = std::max(1, std::thread::hardware_concurrency()); @@ -468,7 +471,7 @@ private: int max_workers; /// Number of active workers - int active_workers() const { return workers.size() - idle_workers.size(); } + int active_workers() const { return workers.size() - idle_worker_count; } /// Worker thread loop. Tagged and start are provided for a tagged worker thread. void worker_thread(unsigned int index, std::optional tagged = std::nullopt, std::function start = nullptr); @@ -483,7 +486,9 @@ private: void proxy_conn_cleanup(); - void proxy_worker_message(std::vector& parts); + using control_message_array = std::array; + + void proxy_worker_message(control_message_array& parts, size_t len); void proxy_process_queue(); @@ -575,18 +580,17 @@ private: /// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open). void proxy_reply(oxenc::bt_dict_consumer data); - /// Currently active batch/reply jobs; this is the container that owns the Batch instances - std::unordered_set batches; /// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the /// completion job using batch_job = std::pair; - std::queue batch_jobs, reply_jobs; + using batch_queue = std::deque; + batch_queue batch_jobs, reply_jobs; int batch_jobs_active = 0; int reply_jobs_active = 0; int batch_jobs_reserved = -1; int reply_jobs_reserved = -1; /// Runs any queued batch jobs - void proxy_run_batch_jobs(std::queue& jobs, int reserved, int& active, bool reply); + void proxy_run_batch_jobs(batch_queue& jobs, int reserved, int& active, bool reply); /// BATCH command. Called with a Batch (see oxenmq/batch.h) object pointer for the proxy to /// take over and queue batch jobs. @@ -608,7 +612,7 @@ private: void process_zap_requests(); /// Handles a control message from some outer thread to the proxy - void proxy_control_message(std::vector& parts); + void proxy_control_message(control_message_array& parts, size_t len); /// Closing any idle connections that have outlived their idle time. Note that this only /// affects outgoing connections; incomings connections are the responsibility of the other end. @@ -744,8 +748,9 @@ private: // These belong to the proxy thread and must not be accessed by a worker: std::thread worker_thread; - size_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n) - std::string worker_routing_id; // "w123" where 123 == worker_id; "n123" for tagged threads. + uint32_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n) + std::string worker_routing_id; // "wXXXX" where XXXX is the raw bytes of worker_id, or tXXXX for tagged threads. + std::string worker_routing_name; // "w123" or "t123" -- human readable version of worker_routing_id /// Loads the run info with an incoming command run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote, @@ -769,7 +774,7 @@ private: /// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The /// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool /// is whether the worker is currently busy (true) or available (false). - std::vector>> tagged_workers; + std::vector> tagged_workers; public: /** diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 3e63db2..28275c2 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -271,14 +271,14 @@ void OxenMQ::proxy_reply(oxenc::bt_dict_consumer data) { } } -void OxenMQ::proxy_control_message(std::vector& parts) { +void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t len) { // We throw an uncaught exception here because we only generate control messages internally in // oxenmq code: if one of these condition fail it's a oxenmq bug. - if (parts.size() < 2) + if (len < 2) throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message"); auto route = view(parts[0]), cmd = view(parts[1]); OMQ_TRACE("control message: ", cmd); - if (parts.size() == 3) { + if (len == 3) { OMQ_TRACE("...: ", parts[2]); auto data = view(parts[2]); if (cmd == "SEND") { @@ -315,7 +315,7 @@ void OxenMQ::proxy_control_message(std::vector& parts) { bind.push_back(std::move(b)); return; } - } else if (parts.size() == 2) { + } else if (len == 2) { if (cmd == "START") { // Command send by the owning thread during startup; we send back a simple READY reply to // let it know we are running. @@ -325,9 +325,9 @@ void OxenMQ::proxy_control_message(std::vector& parts) { // close workers as they come back to READY status, and then close external // connections once all workers are done. max_workers = 0; - for (const auto &route : idle_workers) - route_control(workers_socket, workers[route].worker_routing_id, "QUIT"); - idle_workers.clear(); + for (size_t i = 0; i < idle_worker_count; i++) + route_control(workers_socket, workers[idle_workers[i]].worker_routing_id, "QUIT"); + idle_worker_count = 0; for (auto& [run, busy, queue] : tagged_workers) if (!busy) route_control(workers_socket, run.worker_routing_id, "QUIT"); @@ -335,7 +335,7 @@ void OxenMQ::proxy_control_message(std::vector& parts) { } } throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " + - std::string{cmd} + " (" + std::to_string(parts.size()) + ")"); + std::string{cmd} + " (" + std::to_string(len) + ")"); } bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) { @@ -404,6 +404,7 @@ void OxenMQ::proxy_loop_init() { } workers.reserve(max_workers); + idle_workers.resize(max_workers); if (!workers.empty()) throw std::logic_error("Internal error: proxy thread started with active worker threads"); @@ -482,7 +483,7 @@ void OxenMQ::proxy_loop_init() { } for (auto&w : tagged_workers) { - OMQ_LOG(debug, "Telling tagged thread worker ", std::get(w).worker_routing_id, " to finish startup"); + OMQ_LOG(debug, "Telling tagged thread worker ", std::get(w).worker_routing_name, " to finish startup"); route_control(workers_socket, std::get(w).worker_routing_id, "START"); } } @@ -497,6 +498,10 @@ void OxenMQ::proxy_loop(std::promise startup) { } startup.set_value(); + // Fixed array used for worker and control messages: these are never longer than 3 parts: + std::array control_parts; + + // General vector for handling incoming messages: std::vector parts; while (true) { @@ -529,13 +534,13 @@ void OxenMQ::proxy_loop(std::promise startup) { OMQ_TRACE("processing control messages"); // Retrieve any waiting incoming control messages - for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) { - proxy_control_message(parts); + while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) { + proxy_control_message(control_parts, len); } OMQ_TRACE("processing worker messages"); - for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) { - proxy_worker_message(parts); + while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) { + proxy_worker_message(control_parts, len); } OMQ_TRACE("processing timers"); @@ -731,7 +736,7 @@ void OxenMQ::proxy_process_queue() { if (!busy && !queue.empty()) { busy = true; proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id)); - queue.pop(); + queue.pop_front(); } } diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index 222e68a..d10e938 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -48,10 +48,11 @@ bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector tagged, std::function start) { - std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing - std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug + std::string routing_id = (tagged ? "t" : "w") + + std::string(reinterpret_cast(&index), sizeof(index)); // for routing + std::string worker_id{tagged ? *tagged : "w" + std::to_string(index)}; // for debug - [[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id); + [[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + worker_id); #if defined(__linux__) || defined(__sun) || defined(__MINGW32__) if (thread_name.size() > 15) thread_name.resize(15); pthread_setname_np(pthread_self(), thread_name.c_str()); @@ -161,39 +162,37 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged OxenMQ::run_info& OxenMQ::get_idle_worker() { - if (idle_workers.empty()) { - size_t id = workers.size(); - assert(workers.capacity() > id); + if (idle_worker_count == 0) { + uint32_t id = workers.size(); workers.emplace_back(); auto& r = workers.back(); r.worker_id = id; - r.worker_routing_id = "w" + std::to_string(id); + r.worker_routing_id = "w" + std::string(reinterpret_cast(&id), sizeof(id)); + r.worker_routing_name = "w" + std::to_string(id); return r; } - size_t id = idle_workers.back(); - idle_workers.pop_back(); + size_t id = idle_workers[--idle_worker_count]; return workers[id]; } -void OxenMQ::proxy_worker_message(std::vector& parts) { +void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t len) { // Process messages sent by workers - if (parts.size() != 2) { - OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message"); + if (len != 2) { + OMQ_LOG(error, "Received send invalid ", len, "-part message"); return; } auto route = view(parts[0]), cmd = view(parts[1]); - OMQ_TRACE("worker message from ", route); - assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9'); + if (route.size() != 5 || (route[0] != 'w' && route[0] != 't')) { + OMQ_LOG(error, "Received malformed worker id in worker message; unable to process worker command"); + return; + } bool tagged_worker = route[0] == 't'; - std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t") - unsigned int worker_id = oxenc::detail::extract_unsigned(worker_id_str); - if (!worker_id_str.empty() /* didn't consume everything */ || - (tagged_worker - ? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged) - : worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1 - ) - ) { - OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command"); + uint32_t worker_id; + std::memcpy(&worker_id, route.data() + 1, 4); + if (tagged_worker + ? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged) + : worker_id >= workers.size()) { // regular worker ids are indexed from 0 to N-1 + OMQ_LOG(error, "Received invalid worker id w" + std::to_string(worker_id) + " in worker message; unable to process worker command"); return; } @@ -233,11 +232,11 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { } else { auto& jobs = thread > 0 - ? std::get>(tagged_workers[thread - 1]) // run in tagged thread + ? std::get(tagged_workers[thread - 1]) // run in tagged thread : run.is_reply_job ? reply_jobs : batch_jobs; - jobs.emplace(batch, -1); + jobs.emplace_back(batch, -1); } } else if (state == detail::BatchState::done) { // No completion job @@ -247,9 +246,7 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { } if (clear_job) { - batches.erase(batch); delete batch; - run.to_run = static_cast(nullptr); } } else { assert(run.cat->active_threads > 0); @@ -259,7 +256,7 @@ void OxenMQ::proxy_worker_message(std::vector& parts) { OMQ_TRACE("Telling worker ", route, " to quit"); route_control(workers_socket, route, "QUIT"); } else if (!tagged_worker) { - idle_workers.push_back(worker_id); + idle_workers[idle_worker_count++] = worker_id; } } else if (cmd == "QUITTING"sv) { run.worker_thread.join(); @@ -381,7 +378,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectoractivity(); // outgoing connection activity, pump the activity timer OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]), - " to worker ", run.worker_routing_id); + " to worker ", run.worker_routing_name); proxy_run_worker(run); category.active_threads++; @@ -412,7 +409,7 @@ void OxenMQ::proxy_inject_task(injected_task task) { } auto& run = get_idle_worker(); - OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id); + OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_name); run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback)); proxy_run_worker(run);