diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 1b5bad0..3c6f54d 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -821,15 +821,15 @@ void LokiMQ::proxy_batch(detail::Batch* batch) { const int jobs = batch->size(); for (int i = 0; i < jobs; i++) batch_jobs.emplace(batch, i); + proxy_skip_one_poll = true; } -Batch* LokiMQ::proxy_schedule_job(std::function f) { +void LokiMQ::proxy_schedule_reply_job(std::function f) { auto* b = new Batch; b->add_job(std::move(f)); batches.insert(b); - batch_jobs.emplace(static_cast(b), 0); - proxy_skip_poll = true; - return b; + reply_jobs.emplace(static_cast(b), 0); + proxy_skip_one_poll = true; } // Called either within the proxy thread, or before the proxy thread has been created; actually adds @@ -954,7 +954,7 @@ void LokiMQ::proxy_conn_cleanup() { // Drop idle connections (if we haven't done it in a while) but *only* if we have some idle // general workers: if we don't have any idle workers then we may still have incoming messages which // we haven't processed yet and those messages might end up resetting the last activity time. - if (workers.size() < general_workers) { + if (static_cast(workers.size()) < general_workers) { LMQ_TRACE("closing idle connections"); proxy_expire_idle_peers(); } @@ -1012,10 +1012,13 @@ void LokiMQ::proxy_loop() { workers_socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1); workers_socket.bind(SN_ADDR_WORKERS); - if (general_workers == 0) - general_workers = std::max(std::thread::hardware_concurrency(), 1u); + assert(general_workers > 0); + if (batch_jobs_reserved < 0) + batch_jobs_reserved = (general_workers + 1) / 2; + if (reply_jobs_reserved < 0) + reply_jobs_reserved = (general_workers + 7) / 8; - max_workers = general_workers + batch_jobs_reserved; + max_workers = general_workers + batch_jobs_reserved + reply_jobs_reserved; for (const auto& cat : categories) { max_workers += cat.second.reserved_threads; } @@ -1026,6 +1029,7 @@ void LokiMQ::proxy_loop() { for (const auto& cat : categories) LMQ_TRACE(" - ", cat.first, ": ", cat.second.reserved_threads); LMQ_TRACE(" - (batch jobs): ", batch_jobs_reserved); + LMQ_TRACE(" - (reply jobs): ", reply_jobs_reserved); } #endif @@ -1093,8 +1097,8 @@ void LokiMQ::proxy_loop() { poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())}; } - if (proxy_skip_poll) - proxy_skip_poll = false; + if (proxy_skip_one_poll) + proxy_skip_one_poll = false; else { LMQ_TRACE("polling for new messages"); @@ -1141,7 +1145,7 @@ void LokiMQ::proxy_loop() { for (int i = 0; i < num_sockets; i++) queue_index.push(i); - for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) { + for (parts.clear(); !queue_index.empty() && static_cast(workers.size()) < max_workers; parts.clear()) { size_t i = queue_index.front(); queue_index.pop(); auto& sock = connections[i]; @@ -1225,8 +1229,10 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { if (cmd == "RAN") { LMQ_LOG(debug, "Worker ", route, " finished ", run.command); if (run.is_batch_job) { - assert(batch_jobs_active > 0); - batch_jobs_active--; + auto& jobs = run.is_reply_job ? reply_jobs : batch_jobs; + auto& active = run.is_reply_job ? reply_jobs_active : batch_jobs_active; + assert(active > 0); + active--; bool clear_job = false; if (run.batch_jobno == -1) { // Returned from the completion function @@ -1234,13 +1240,13 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { } else { auto status = run.batch->job_finished(); if (status == detail::BatchStatus::complete) { - batch_jobs.emplace(run.batch, -1); + jobs.emplace(run.batch, -1); } else if (status == detail::BatchStatus::complete_proxy) { try { run.batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD } catch (const std::exception &e) { - // Raise these to error levels: you really shouldn't be doing anything - // complicated in an in-proxy completion function! + // Raise these to error levels: the caller really shouldn't be doing + // anything non-trivial in an in-proxy completion function! LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what()); } catch (...) { LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command"); @@ -1306,7 +1312,7 @@ bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector data.reserve(parts.size() - (tag_pos + 1)); for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it) data.emplace_back(view(*it)); - proxy_schedule_job([callback=std::move(it->second.second), data=std::move(data)] { + proxy_schedule_reply_job([callback=std::move(it->second.second), data=std::move(data)] { callback(true, std::move(data)); }); pending_requests.erase(it); @@ -1341,7 +1347,7 @@ bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector } LMQ_LOG(info, "Got initial HELLO server response from ", peer_address(parts.back())); - proxy_schedule_job([on_success=std::move(std::get(pc)), + proxy_schedule_reply_job([on_success=std::move(std::get(pc)), conn=conn_index_to_id[conn_index]] { on_success(conn); }); @@ -1383,36 +1389,56 @@ LokiMQ::run_info& LokiMQ::get_idle_worker() { return workers[id]; } -void LokiMQ::set_batch_threads(unsigned int threads) { +void LokiMQ::set_batch_threads(int threads) { if (proxy_thread.joinable()) throw std::logic_error("Cannot change reserved batch threads after calling `start()`"); + if (threads < -1) // -1 is the default which is based on general threads + throw std::out_of_range("Invalid set_batch_threads() value " + std::to_string(threads)); batch_jobs_reserved = threads; } -void LokiMQ::set_general_threads(unsigned int threads) { +void LokiMQ::set_reply_threads(int threads) { + if (proxy_thread.joinable()) + throw std::logic_error("Cannot change reserved reply threads after calling `start()`"); + if (threads < -1) // -1 is the default which is based on general threads + throw std::out_of_range("Invalid set_reply_threads() value " + std::to_string(threads)); + reply_jobs_reserved = threads; +} + +void LokiMQ::set_general_threads(int threads) { if (proxy_thread.joinable()) throw std::logic_error("Cannot change general thread count after calling `start()`"); + if (threads < 1) + throw std::out_of_range("Invalid set_general_threads() value " + std::to_string(threads) + ": general threads must be > 0"); general_workers = threads; } -LokiMQ::run_info& LokiMQ::run_info::operator=(pending_command&& pending) { +LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, std::string route_, + std::vector data_parts_, const std::pair* callback_) { is_batch_job = false; - cat = &pending.cat; - command = std::move(pending.command); - conn = std::move(pending.conn); - conn_route = std::move(pending.conn_route); - data_parts = std::move(pending.data_parts); - callback = pending.callback; + is_reply_job = false; + cat = cat_; + command = std::move(command_); + conn = std::move(conn_); + conn_route = std::move(route_); + data_parts = std::move(data_parts_); + callback = callback_; return *this; } -LokiMQ::run_info& LokiMQ::run_info::operator=(batch_job&& bj) { + +LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) { + return load(&pending.cat, std::move(pending.command), std::move(pending.conn), + std::move(pending.conn_route), std::move(pending.data_parts), pending.callback); +} + +LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) { is_batch_job = true; + is_reply_job = reply_job; batch_jobno = bj.second; batch = bj.first; return *this; } - void LokiMQ::proxy_run_worker(run_info& run) { if (!run.worker_thread.joinable()) run.worker_thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id}; @@ -1420,21 +1446,29 @@ void LokiMQ::proxy_run_worker(run_info& run) { send_routed_message(workers_socket, run.worker_routing_id, "RUN"); } +void LokiMQ::proxy_run_batch_jobs(std::queue& jobs, const int reserved, int& active, bool reply) { + while (!jobs.empty() && + (active < reserved || static_cast(workers.size() - idle_workers.size()) < general_workers)) { + proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply)); + jobs.pop(); + active++; + } +} void LokiMQ::proxy_process_queue() { - // First up: process any batch jobs; since these are internally they are given higher priority. - while (!batch_jobs.empty() && - (batch_jobs_active < batch_jobs_reserved || workers.size() - idle_workers.size() < general_workers)) { - proxy_run_worker(get_idle_worker() = std::move(batch_jobs.front())); - batch_jobs.pop(); - batch_jobs_active++; - } + // First up: process any batch jobs; since these are internal they are given higher priority. + proxy_run_batch_jobs(batch_jobs, batch_jobs_reserved, batch_jobs_active, false); + // Next any reply batch jobs (which are a bit different from the above, since they are + // externally triggered but for things we initiated locally). + proxy_run_batch_jobs(reply_jobs, reply_jobs_reserved, reply_jobs_active, true); + + // Finally general incoming commands for (auto it = pending_commands.begin(); it != pending_commands.end() && active_workers() < max_workers; ) { auto& pending = *it; if (pending.cat.active_threads < pending.cat.reserved_threads || active_workers() < general_workers) { - proxy_run_worker(get_idle_worker() = std::move(pending)); + proxy_run_worker(get_idle_worker().load(std::move(pending))); pending.cat.queued--; pending.cat.active_threads++; assert(pending.cat.queued >= 0); @@ -1530,24 +1564,14 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par } auto& run = get_idle_worker(); - run.is_batch_job = false; - run.cat = &category; - run.command = std::move(command); - run.conn.pk = peer->pubkey; - if (peer->service_node) { - run.conn.id = ConnectionID::SN_ID; - run.conn_route.clear(); - } else { - run.conn.id = conn_index_to_id[conn_index].id; - if (outgoing) - run.conn_route.clear(); - else - run.conn_route = tmp_peer.route; + { + ConnectionID c{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey}; + if (outgoing || peer->service_node) + tmp_peer.route.clear(); + run.load(&category, std::move(command), std::move(c), std::move(tmp_peer.route), + std::move(data_parts), cat_call.second); } - run.data_parts = std::move(data_parts); - run.callback = cat_call.second; - if (outgoing) peer->activity(); // outgoing connection activity, pump the activity timer @@ -1796,7 +1820,7 @@ void LokiMQ::proxy_connect_remote(bt_dict_consumer data) { setup_outgoing_socket(sock, remote_pubkey); sock.connect(remote); } catch (const zmq::error_t &e) { - proxy_schedule_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { + proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { on_failure(conn_id, std::move(what)); }); return; diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index b8f341f..9bf3186 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -459,23 +459,23 @@ private: std::vector idle_workers; /// Maximum number of general task workers, specified by g`/during construction - unsigned int general_workers = std::thread::hardware_concurrency(); + int general_workers = std::max(1, std::thread::hardware_concurrency()); /// Maximum number of possible worker threads we can have. This is calculated when starting, /// and equals general_workers plus the sum of all categories' reserved threads counts plus the /// reserved batch workers count. This is also used to signal a shutdown; we set it to 0 when /// quitting. - unsigned int max_workers; + int max_workers; /// Number of active workers - unsigned int active_workers() const { return workers.size() - idle_workers.size(); } + int active_workers() const { return workers.size() - idle_workers.size(); } /// Worker thread loop void worker_thread(unsigned int index); /// If set, skip polling for one proxy loop iteration (set when we know we have something /// processible without having to shove it onto a socket, such as scheduling an internal job). - bool proxy_skip_poll = false; + bool proxy_skip_one_poll = false; /// Does the proxying work void proxy_loop(); @@ -486,7 +486,7 @@ private: void proxy_process_queue(); - Batch* proxy_schedule_job(std::function f); + void proxy_schedule_reply_job(std::function f); /// Looks up a peers element given a connect index (for outgoing connections where we already /// knew the pubkey and SN status) or an incoming zmq message (which has the pubkey and sn @@ -567,13 +567,17 @@ private: /// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open). void proxy_reply(bt_dict_consumer data); - /// Currently active batches. + /// Currently active batch/reply jobs; this is the container that owns the Batch instances std::unordered_set batches; /// Individual batch jobs waiting to run using batch_job = std::pair; - std::queue batch_jobs; - unsigned int batch_jobs_active = 0; - unsigned int batch_jobs_reserved = std::max((std::thread::hardware_concurrency() + 1) / 2, 1u); + std::queue batch_jobs, reply_jobs; + int batch_jobs_active = 0; + int reply_jobs_active = 0; + int batch_jobs_reserved = -1; + int reply_jobs_reserved = -1; + /// Runs any queued batch jobs + void proxy_run_batch_jobs(std::queue& jobs, int reserved, int& active, bool reply); /// BATCH command. Called with a Batch (see lokimq/batch.h) object pointer for the proxy to /// take over and queue batch jobs. @@ -662,6 +666,7 @@ private: /// transient data we are passing into the thread. struct run_info { bool is_batch_job = false; + bool is_reply_job = false; // If is_batch_job is false then these will be set appropriate (if is_batch_job is true then // these shouldn't be accessed and likely contain stale data). @@ -684,10 +689,14 @@ private: size_t worker_id; // The index in `workers` std::string worker_routing_id; // "w123" where 123 == worker_id - /// Loads the run info with a pending command - run_info& operator=(pending_command&& pending); + /// Loads the run info with an incoming command + run_info& load(category* cat, std::string command, ConnectionID conn, std::string route, + std::vector data_parts, const std::pair* callback); + + /// Loads the run info with a stored pending command + run_info& load(pending_command&& pending); /// Loads the run info with a batch job - run_info& operator=(batch_job&& bj); + run_info& load(batch_job&& bj, bool reply_job = false); }; /// Data passed to workers for the RUN command. The proxy thread sets elements in this before /// sending RUN to a worker then the worker uses it to get call info, and only allocates it @@ -825,17 +834,28 @@ public: void add_command_alias(std::string from, std::string to); /** - * Sets the number of worker threads reserved for batch jobs. If not called this defaults to - * half the number of hardware threads available (rounded up). This works exactly like - * reserved_threads for a category, but allows to batch jobs. See category for details. + * Sets the number of worker threads reserved for batch jobs. If not explicitly called then + * this defaults to half the general worker threads configured (rounded up). This works exactly + * like reserved_threads for a category, but allows to batch jobs. See category for details. * * Note that some internal jobs are counted as batch jobs: in particular timers added via - * add_timer() and replies received in response to request commands currently each take a batch - * job slot when invoked. + * add_timer() are scheduled as batch jobs. * * Cannot be called after start()ing the LokiMQ instance. */ - void set_batch_threads(unsigned int threads); + void set_batch_threads(int threads); + + /** + * Sets the number of worker threads reserved for handling replies from servers; this is + * mostly for responses to `request()` calls, but also gets used for other network-related + * events such as the ConnectSuccess/ConnectFailure callbacks for establishing remote non-SN + * connections. + * + * Defaults to one-eighth of the number of configured general threads, rounded up. + * + * Cannot be changed after start()ing the LokiMQ instance. + */ + void set_reply_threads(int threads); /** * Sets the number of general worker threads. This is the target number of threads to run that @@ -844,9 +864,13 @@ public: * reserved threads can create threads in addition to the amount specified here if necessary to * fulfill the reserved threads count for the category. * + * Adjusting this also adjusts the default values of batch and reply threads, above. + * + * Defaults to `std::thread::hardware_concurrency()`. + * * Cannot be called after start()ing the LokiMQ instance. */ - void set_general_threads(unsigned int threads); + void set_general_threads(int threads); /** * Finish starting up: binds to the bind locations given in the constructor and launches the