From f75b6cf2217dda361c394acf9ff5be2c4fb08a1e Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 5 Feb 2020 20:21:27 -0400 Subject: [PATCH] Added batch job implementation + various improvements This overhauls the proposed batch implementation (described in the README but previously not implemented) and implements it. Various other minor improvements and code restructuring. Added a proposed "request" type to the README to be implemented; this is like a command, but always expects a ["REPLY", TAG, ...] response. The request invoker provides a callback to invoke when such a REPLY arrives. --- README.md | 146 +++++++++---- TODO.txt | 6 + lokimq/batch.h | 234 +++++++++++++++++++++ lokimq/bt_serialize.h | 6 +- lokimq/lokimq.cpp | 475 ++++++++++++++++++++++++++---------------- lokimq/lokimq.h | 170 ++++++++++++--- 6 files changed, 786 insertions(+), 251 deletions(-) create mode 100644 TODO.txt create mode 100644 lokimq/batch.h diff --git a/README.md b/README.md index 404100e..b390732 100644 --- a/README.md +++ b/README.md @@ -17,18 +17,19 @@ much better performing and more scalable) see the ZMQ guide documentation on the ## Basic message structure -LokiMQ messages consist of 1+ part messages where the first part is a string command and remaining -parts are command-specific data. +LokiMQ messages come in two fundamental forms: "commands", consisting of a command named and +optional arguments, and "requests", consisting of a request name, a request tag, and optional +arguments. -The command string is one of two types: +The command/request string is one of two types: -`basic` - for basic commands such as authentication (`login`) handled by LokiMQ itself. These -commands may not contain a `.`, and are reserved for LokiMQ itself. +`BASIC` - for basic requests such as authentication (`LOGIN`) handled by LokiMQ itself. These +commands may not contain a `.`, and are handled by LokiMQ itself. -`category.command` - for commands registered by the LokiMQ caller (e.g. lokid). Here `category` -must be at least one character not containing a `.` and `command` may be anything. These categories -and commands are registered according to general function and authentication level (more on this -below). For example, for lokid categories are: +`category.command` - for commands/requests registered by the LokiMQ caller (e.g. lokid). Here +`category` must be at least one character not containing a `.` and `command` may be anything. These +categories and commands are registered according to general function and authentication level (more +on this below). For example, for lokid categories are: - `system` - is for RPC commands related to the system administration such as mining, getting sensitive statistics, accessing SN private keys, remote shutdown, etc. @@ -38,6 +39,71 @@ below). For example, for lokid categories are: - `blockchain` - is for remote blockchain access such as retrieving blocks and transactions as well as subscribing to updates for new blocks, transactions, and service node states. +The difference between a request and a command is that a request includes an additional opaque tag +value which is used to identify a reply. For example you could register a `general.backwards` +request that takes a string that receives a reply containing that string reversed. When invoking +the request via LokiMQ you provide a callback to be invoked when the reply arrives. On the wire +this looks like: + + <<< [general.backwards] [v71.&a] [hello world] + >>> [REPLY] [v71.&a] [dlrow olleh] + +where each [] denotes a message part and `v71.&a` is a unique randomly generated identifier handled +by LokiMQ (both the invoker and the recipient code only see the `hello world`/`dlrow olleh` message +parts). + +In contrast, regular registered commands have no identifier or expected reply callback. For example +you could register a `general.pong` commands that takes an argument and prints it out. So requests +and output would look like this: + + >>> [general.pong] [hi] + hi + >>> [general.pong] [there] + there + +You could also create a `ping` command that instructs someone to pong you with a random word -- i.e. +give him a ping and she sends you a pong: + + <<< [general.ping] + >>> [general.pong] [omg] + omg + +Although this *looks* like a reply it isn't quite the same because there is no connection between +the ping and the pong (and, as above, pongs can be issued directly). In particular this means if +you send multiple pings to the same recipient: + + <<< [general.ping] + <<< [general.ping] + >>> [general.pong] [world] + >>> [general.pong] [hello] + +you would have no way to know whether the first pong is in reply to the first or second ping. We +could amend this to include a number to be echoed back: + + <<< [general.ping] [1] + <<< [general.ping] [2] + >>> [general.pong] [2] [world] + >>> [general.pong] [1] [hello] + +and now, in the pong, we could keep track of which number goes with which outgoing ping. This is +the basic idea behind using a reply instead of command, except that you don't register the `pong` +command at all (there is a generic "REPLY" command for all replies), and the index values are +handled for you transparently. + +## Command arguments + +Optional command/request arguments are always strings on the wire. The LokiMQ-using developer is +free to create whatever encoding she wants, and these can vary across commands. For example +`wallet.tx` might be a request that returns a transaction in binary, while `wallet.tx_info` might +return tx metadata in JSON, and `p2p.send_tx` might encode tx data and metadata in a bt-encoded +data string. + +No structure at all is imposed on message data to allow maximum flexibility; it is entirely up to +the calling code to handle all encoding/decoding duties. + +Internal commands passed between LokiMQ-managed threads use either plain strings or bt-encoded +dictionaries. See `lokimq/bt_serialize.h` if you want a bt serializer/deserializer. + ## Command invocation The application registers categories and registers commands within these categories with callbacks. @@ -229,41 +295,51 @@ For example, the following example shows how you might use it to convert from in to some other output value: ```C++ -struct task_data { int input; double result; }; +double do_my_task(int input) { + if (input % 10 == 7) + throw std::domain_error("I don't do '7s, sorry"); + if (input == 1) + return 5.0; + return 3.0 * input; +} -// Called for each job. -void do_my_task(void* in) { - auto& x = *static_cast(in); - x.result = 42.0 * x.input; // Job +void continue_big_task(std::vector> results) { + double sum = 0; + for (auto& r : results) { + try { + sum += r.get(); + } catch (const std::exception& e) { + std::cout << "Oh noes! " << e.what() << "\n"; + } + } + std::cout << "All done, sum = " << sum << "\n"; + + // Output: + // Oh noes! I don't do '7s, sorry + // Oh noes! I don't do '7s, sorry + // Oh noes! I don't do '7s, sorry + // All done, sum = 1337 } void start_big_task() { - // ... Before code ... + size_t num_jobs = 32; - auto* results = new std::vector{50}; + lokimq::Batch batch; + batch.reserve(num_jobs); - lokimq::Batch batch; - for (size_t i = 0; i < results->size(); i++) { - auto* r = (*result)[i]; - r->input = i; - batch.add_job(&do_my_task, r); - } - lmq.job(batch, &continue_big_task, results); + for (size_t i = 0; i < num_jobs; i++) + batch.add_job([i]() { return do_my_task(i); }); + + batch.completion(&continue_big_task); + + lmq.batch(std::move(batch)); // ... to be continued in `continue_big_task` after all the jobs finish -} -// This will be called once all the `do_my_task` calls have completed. (Note that we could be in -// a different thread from the one `start_big_task()` was running in). -void continue_big_task(void* rptr) { - // Put into a unique_ptr to deal with ownership - std::unique_ptr> results{static_cast*>(rptr)}; - double sum = 0; - for (auto &r : results) sum += r; - std::cout << "All done, sum = " << sum << "\n"; + // Can do other things here, but note that continue_big_task could run + // *before* anything else here finishes. } ``` This code deliberately does not support blocking to wait for the tasks to finish: if you want such a -bad design you can implement it yourself; LokiMQ isn't going to help you hurt yourself. - - +poor design (which is a recipe for deadlocks, imagine jobs queuing other jobs and then waiting) you +can implement it yourself; LokiMQ isn't going to help you hurt yourself like that. diff --git a/TODO.txt b/TODO.txt new file mode 100644 index 0000000..9ec8511 --- /dev/null +++ b/TODO.txt @@ -0,0 +1,6 @@ + +- split out proxy code & data into a private lokimq/proxy.h header so that the main header doesn't + need to include so much. + +- timed, i.e. schedule this job to run in X time. This requires dynamically adjusting the polling + timeout to use the current value *or* the next timer (if sooner). diff --git a/lokimq/batch.h b/lokimq/batch.h new file mode 100644 index 0000000..6f621fa --- /dev/null +++ b/lokimq/batch.h @@ -0,0 +1,234 @@ +// Copyright (c) 2020, The Loki Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once +#include +#include +#include +#include "lokimq.h" + +namespace lokimq { + +namespace detail { + +enum class BatchStatus { + running, // there are still jobs to run (or running) + complete, // the batch is complete but still has a completion function to call + done // the batch is complete and has no completion function +}; + +// Virtual base class for Batch +class Batch { +public: + // Returns the number of jobs in this batch + virtual size_t size() const = 0; + // Called in a worker thread to run the job + virtual void run_job(int i) = 0; + // Called in the main proxy thread when the worker returns from finishing a job. The return + // value tells us whether the current finishing job finishes off the batch: `running` to tell us + // there are more jobs; `complete` to tell us that the jobs are done but the completion function + // needs to be called; and `done` to signal that the jobs are done and there is no completion + // function. + virtual BatchStatus job_finished() = 0; + // Called by a worker; not scheduled until all jobs are done. + virtual void job_completion() = 0; + + virtual ~Batch() = default; +}; + +} + +/** + * Simple class that can either hold a result or an exception and retrieves the result (or raises + * the exception) via a .get() method. + * + * This is designed to be like a very stripped down version of a std::promise/std::future pair. We + * reimplemented it, however, because by ditching all the thread synchronization that promise/future + * guarantees we can substantially reduce call overhead (by a factor of ~8 according to benchmarking + * code). Since LokiMQ's proxy<->worker communication channel already gives us thread that overhead + * would just be wasted. + * + * @tparam R the value type held by the result; must be default constructible. Note, however, that + * there are specializations provided for lvalue references types and `void` (which obviously don't + * satisfy this). + */ +template +class job_result { + R value; + std::exception_ptr exc; + +public: + /// Sets the value. Should be called only once, or not at all if set_exception was called. + void set_value(R&& v) { value = std::move(v); } + + /// Sets the exception, which will be rethrown when `get()` is called. Should be called + /// only once, or not at all if set_value() was called. + void set_exception(std::exception_ptr e) { exc = std::move(e); } + + /// Retrieves the value. If an exception was set instead of a value then that exception is + /// thrown instead. Note that the interval value is moved out of the held value so you should + /// not call this multiple times. + R get() { + if (exc) std::rethrow_exception(exc); + return std::move(value); + } +}; + +/** job_result specialization for reference types */ +template +class job_result::value>> { + std::remove_reference_t* value_ptr; + std::exception_ptr exc; + +public: + void set_value(R v) { value_ptr = &v; } + void set_exception(std::exception_ptr e) { exc = std::move(e); } + R get() { + if (exc) std::rethrow_exception(exc); + return *value_ptr; + } +}; + +/** job_result specialization for void; there is no value, but exceptions are still captured + * (rethrown when `get()` is called). + */ +template<> +class job_result { + std::exception_ptr exc; + +public: + void set_exception(std::exception_ptr e) { exc = std::move(e); } + // Returns nothing, but rethrows if there is a captured exception. + void get() { if (exc) std::rethrow_exception(exc); } +}; + +/// Helper class used to set up batches of jobs to be scheduled via the lokimq job handler. +/// +/// @tparam R - the return type of the individual jobs +/// +template +class Batch final : private detail::Batch { + friend class LokiMQ; +public: + /// The completion function type, called after all jobs have finished. + using CompletionFunc = std::function> results)>; + + // Default constructor + Batch() = default; + + // movable + Batch(Batch&&) = default; + Batch &operator=(Batch&&) = default; + + // non-copyable + Batch(const Batch&) = delete; + Batch &operator=(const Batch&) = delete; + +private: + std::vector> jobs; + std::vector> results; + CompletionFunc complete; + std::size_t jobs_outstanding = 0; + bool started = false; + + void check_not_started() { + if (started) + throw std::logic_error("Cannot add jobs or completion function after starting a lokimq::Batch!"); + } + +public: + /// Preallocates space in the internal vector that stores jobs. + void reserve(std::size_t num) { + jobs.reserve(num); + results.reserve(num); + } + + /// Adds a job. This takes any callable object that is invoked with no arguments and returns R + /// (the Batch return type). The tasks will be scheduled and run when the next worker thread is + /// available. The called function may throw exceptions (which will be propagated to the + /// completion function through the job_result values). There is no guarantee on the order of + /// invocation of the jobs. + void add_job(std::function job) { + check_not_started(); + jobs.emplace_back(std::move(job)); + results.emplace_back(); + jobs_outstanding++; + } + + /// Sets the completion function to invoke after all jobs have finished. If this is not set + /// then jobs simply run and results are discarded. + void completion(CompletionFunc comp) { + check_not_started(); + complete = std::move(comp); + } + +private: + + std::size_t size() const override { + return jobs.size(); + } + + template + void set_value(job_result& r, std::function& f) { r.set_value(f()); } + void set_value(job_result&, std::function& f) { f(); } + + void run_job(const int i) override { + // called by worker thread + auto& r = results[i]; + try { + set_value(r, jobs[i]); + } catch (...) { + r.set_exception(std::current_exception()); + } + } + + detail::BatchStatus job_finished() override { + --jobs_outstanding; + if (jobs_outstanding) + return detail::BatchStatus::running; + if (complete) + return detail::BatchStatus::complete; + return detail::BatchStatus::done; + } + + void job_completion() override { + return complete(std::move(results)); + } +}; + + +template +void LokiMQ::batch(Batch&& batch) { + if (batch.size() == 0) + throw std::logic_error("Cannot batch a a job batch with 0 jobs"); + // Need to send this over to the proxy thread via the base class pointer. It assumes ownership. + auto* baseptr = static_cast(new Batch(std::move(batch))); + detail::send_control(get_control_socket(), "BATCH", bt_serialize(reinterpret_cast(baseptr))); +} + +} diff --git a/lokimq/bt_serialize.h b/lokimq/bt_serialize.h index eb761f5..46790ed 100644 --- a/lokimq/bt_serialize.h +++ b/lokimq/bt_serialize.h @@ -131,9 +131,9 @@ inline void bt_need_more(const string_view &s) { union maybe_signed_int64_t { int64_t i64; uint64_t u64; }; -/// Deserializes a signed or unsigned 64-bit integer from an input stream. Sets the second bool to -/// true iff the value is int64_t because a negative value was read. Throws an exception if the -/// read value doesn't fit in a int64_t (if negative) or a uint64_t (if positive). Removes consumed +/// Deserializes a signed or unsigned 64-bit integer from a string. Sets the second bool to true +/// iff the value is int64_t because a negative value was read. Throws an exception if the read +/// value doesn't fit in a int64_t (if negative) or a uint64_t (if positive). Removes consumed /// characters from the string_view. std::pair bt_deserialize_integer(string_view& s); diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 000e39e..708ecf2 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -5,6 +5,7 @@ extern "C" { #include } #include "lokimq.h" +#include "batch.h" #include "hex.h" namespace lokimq { @@ -141,7 +142,7 @@ std::string zmtp_metadata(string_view key, string_view value) { } void check_not_started(const std::thread& proxy_thread) { - if (proxy_thread.get_id() != std::thread::id{}) + if (proxy_thread.joinable()) throw std::logic_error("Cannot add categories/commands/aliases after calling `start()`"); } @@ -168,14 +169,18 @@ std::string to_string(AuthLevel a) { /// Extracts a pubkey and SN status from a zmq message properties. Throws on failure. void extract_pubkey(zmq::message_t& msg, std::string& pubkey, bool& service_node) { string_view pubkey_hex{msg.gets("User-Id")}; - // The ZAP handler sets the User-Id to S: (service node) or C: (non-SN) followed by the - // pubkey. - if (pubkey_hex.size() != 66 || (pubkey_hex[0] != 'S' && pubkey_hex[0] != 'C') || pubkey_hex[1] != ':') + if (pubkey_hex.size() != 64) throw std::logic_error("bad user-id"); - assert(is_hex(pubkey_hex.begin() + 2, pubkey_hex.end())); + assert(is_hex(pubkey_hex.begin(), pubkey_hex.end())); pubkey.reserve(32); from_hex(pubkey_hex.begin() + 2, pubkey_hex.end(), std::back_inserter(pubkey)); - service_node = pubkey_hex[0] == 'S'; + + service_node = false; + try { + string_view is_sn{msg.gets("X-SN")}; + if (is_sn.size() == 1 && is_sn[0] == '1') + service_node = true; + } catch (...) { /* property not set, ignore */ } } const char* peer_address(zmq::message_t& msg) { @@ -330,11 +335,10 @@ LokiMQ::LokiMQ( std::vector bind_, SNRemoteAddress lookup, AllowFunc allow, - Logger logger, - unsigned int general_workers) + Logger logger) : object_id{next_id++}, pubkey{std::move(pubkey_)}, privkey{std::move(privkey_)}, local_service_node{service_node}, bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{logger}, - poll_remote_offset{poll_internal_size + (bind.empty() ? 0 : 1)}, general_workers{general_workers} { + poll_remote_offset{poll_internal_size + (bind.empty() ? 0 : 1)} { LMQ_LOG(trace, "Constructing listening LokiMQ, id=", object_id, ", this=", this); @@ -369,7 +373,7 @@ LokiMQ::LokiMQ( } void LokiMQ::start() { - if (proxy_thread.get_id() != std::thread::id{}) + if (proxy_thread.joinable()) throw std::logic_error("Cannot call start() multiple times!"); LMQ_LOG(info, "Initializing LokiMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey)); @@ -409,18 +413,28 @@ void LokiMQ::worker_thread(unsigned int index) { Message message{*this}; std::vector parts; - run_info& run = workers[index]; // This is our first job, and will be updated later with subsequent jobs + run_info& run = workers[index]; // This contains our first job, and will be updated later with subsequent jobs while (true) { try { - message.pubkey = {run.pubkey.data(), 32}; - message.service_node = run.service_node; - message.data.clear(); - for (auto& m : run.message_parts) - message.data.emplace_back(m.data(), m.size()); + if (run.is_batch_job) { + if (run.batch_jobno >= 0) { + LMQ_LOG(trace, "worker thread ", worker_id, " running batch ", run.batch, "#", run.batch_jobno); + run.batch->run_job(run.batch_jobno); + } else if (run.batch_jobno == -1) { + LMQ_LOG(trace, "worker thread ", worker_id, " running batch ", run.batch, " completion"); + run.batch->job_completion(); + } + } else { + message.pubkey = {run.pubkey.data(), 32}; + message.service_node = run.service_node; + message.data.clear(); + for (auto& m : run.data_parts) + message.data.emplace_back(m.data(), m.size()); - LMQ_LOG(trace, "worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); - (*run.callback)(message); + LMQ_LOG(trace, "worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); + (*run.callback)(message); + } /* * FIXME: BYE should be handled by the proxy thread, not the worker. @@ -428,7 +442,7 @@ void LokiMQ::worker_thread(unsigned int index) { /* if (msg.command == "BYE") { LMQ_LOG(info, "peer asked us to disconnect"); - detail::send_control(get_control_socket(), "DISCONNECT", {{"pubkey",msg.pubkey}}); + detail::send_control(get_control_socket(), "DISCONNECT", msg.pubkey); continue; } */ @@ -453,7 +467,7 @@ void LokiMQ::worker_thread(unsigned int index) { LMQ_LOG(warn, worker_id, "/", object_id, " received disallowed ", cmd_type, " command ", msg.command << " from " << (msg.sn ? "non-" : "") << "SN remote " << to_hex(msg.pubkey) << "; replying with a BYE"); send(msg.pubkey, "BYE", send_option::incoming{}); - detail::send_control(get_control_socket(), "DISCONNECT", {{"pubkey",msg.pubkey}}); + detail::send_control(get_control_socket(), "DISCONNECT", msg.pubkey); continue; } @@ -478,7 +492,7 @@ void LokiMQ::worker_thread(unsigned int index) { while (true) { // Signal that we are ready for another job and wait for it. (We do this down here // because our first job gets set up when the thread is started). - detail::send_control(sock, "READY"); + detail::send_control(sock, "RAN"); LMQ_LOG(trace, "worker ", worker_id, " waiting for requests"); parts.clear(); recv_message_parts(sock, std::back_inserter(parts)); @@ -675,16 +689,31 @@ void LokiMQ::proxy_reply(bt_dict &&data) { } } +void LokiMQ::proxy_batch(detail::Batch* batchptr) { + auto& batch = *batches.emplace(batchptr).first; + const int jobs = batch->size(); + for (int i = 0; i < jobs; i++) + batch_jobs.emplace(batch, i); +} + void LokiMQ::proxy_control_message(std::vector parts) { if (parts.size() < 2 || parts.size() > 3) throw std::logic_error("Expected 2-3 message parts for a proxy control message"); auto route = view(parts[0]), cmd = view(parts[1]); - bt_dict data; - if (parts.size() > 2) { - bt_deserialize(view(parts[2]), data); - } LMQ_LOG(trace, "control message: ", cmd); - if (cmd == "START") { + if (cmd == "SEND") { + LMQ_LOG(trace, "proxying message"); + proxy_send(bt_deserialize(view(parts.at(2)))); + } else if (cmd == "REPLY") { + LMQ_LOG(trace, "proxying reply to non-SN incoming message"); + proxy_reply(bt_deserialize(view(parts.at(2)))); + } else if (cmd == "BATCH") { + LMQ_LOG(trace, "proxy batch jobs"); + auto ptrval = bt_deserialize(view(parts.at(2))); + proxy_batch(reinterpret_cast(ptrval)); + } else if (cmd == "CONNECT") { + proxy_connect(bt_deserialize(view(parts.at(2)))); + } else if (cmd == "START") { // Command send by the owning thread during startup; we send back a simple READY reply to // let it know we are running. route_control(command, route, "READY"); @@ -696,16 +725,6 @@ void LokiMQ::proxy_control_message(std::vector parts) { for (const auto &route : idle_workers) route_control(workers_socket, workers[route].routing_id, "QUIT"); idle_workers.clear(); - } else if (cmd == "CONNECT") { - proxy_connect(std::move(data)); - } else if (cmd == "DISCONNECT") { - proxy_disconnect(data.at("pubkey").get()); - } else if (cmd == "SEND") { - LMQ_LOG(trace, "proxying message to ", to_hex(data.at("pubkey").get())); - proxy_send(std::move(data)); - } else if (cmd == "REPLY") { - LMQ_LOG(trace, "proxying reply to non-SN incoming message"); - proxy_reply(std::move(data)); } else { throw std::runtime_error("Proxy received invalid control command: " + std::string{cmd} + " (" + std::to_string(parts.size()) + ")"); @@ -771,12 +790,19 @@ void LokiMQ::proxy_loop() { workers_socket.setsockopt(ZMQ_ROUTER_MANDATORY, 1); workers_socket.bind(SN_ADDR_WORKERS); - if (!general_workers) - general_workers = std::thread::hardware_concurrency(); + if (general_workers == 0) + general_workers = std::max(std::thread::hardware_concurrency(), 1u); - max_workers = general_workers; - for (const auto& cat : categories) + max_workers = general_workers + batch_jobs_reserved; + for (const auto& cat : categories) { max_workers += cat.second.reserved_threads; + } + + if (log_level() >= LogLevel::trace) { + LMQ_LOG(trace, "Reserving space for ", max_workers, " max workers = ", general_workers, " general + category reserved:"); + for (const auto& cat : categories) + LMQ_LOG(trace, " - ", cat.first, ": ", cat.second.reserved_threads); + } workers.reserve(max_workers); if (!workers.empty()) @@ -817,13 +843,11 @@ void LokiMQ::proxy_loop() { constexpr auto timeout_check_interval = 10000ms; // Minimum time before for checking for connections to close since the last check auto last_conn_timeout = std::chrono::steady_clock::now(); - size_t last_conn_index = 0; // Index of the connection we last received a message from; see below - std::vector parts; while (true) { if (max_workers == 0) { // Will be 0 only if we are quitting - if (workers.empty()) { + if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.thread.joinable(); })) { // All the workers have finished, so we can finish shutting down return proxy_quit(); } @@ -833,8 +857,7 @@ void LokiMQ::proxy_loop() { // available worker room then also poll incoming connections and outgoing connections for // messages to forward to a worker. Otherwise, we just look for a control message or a // worker coming back with a ready message. - bool workers_available = idle_workers.size() > 0 || workers.size() < max_workers; - zmq::poll(pollitems.data(), workers_available ? pollitems.size() : poll_internal_size, poll_timeout); + zmq::poll(pollitems.data(), pollitems.size(), poll_timeout); LMQ_LOG(trace, "processing control messages"); // Retrieve any waiting incoming control messages @@ -843,81 +866,48 @@ void LokiMQ::proxy_loop() { } LMQ_LOG(trace, "processing worker messages"); - // Process messages sent by workers for (parts.clear(); recv_message_parts(workers_socket, std::back_inserter(parts), zmq::recv_flags::dontwait); parts.clear()) { - if (parts.size() != 2) { - LMQ_LOG(error, "Received send invalid ", parts.size(), "-part message"); - continue; - } - auto route = view(parts[0]), cmd = view(parts[1]); - assert(route.size() >= 2 && route[0] == 'w' && route[1] >= '1' && route[1] <= '9'); - string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" - unsigned int worker_id = detail::extract_unsigned(worker_id_str); - if (!worker_id_str.empty() /* didn't consume everything */ || worker_id >= workers.size()) { - LMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command"); - continue; - } - - LMQ_LOG(trace, "received ", cmd, " command from ", route); - if (cmd == "READY") { - LMQ_LOG(debug, "Worker ", route, " is ready"); - if (max_workers == 0) { // Shutting down - LMQ_LOG(trace, "Telling worker ", route, " to quit"); - route_control(workers_socket, route, "QUIT"); - } else { - idle_workers.push_back(worker_id); - } - } else if (cmd == "QUITTING") { - workers[worker_id].thread.join(); - LMQ_LOG(debug, "Worker ", route, " exited normally"); - } else { - LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'"); - } + proxy_worker_message(parts); } // Handle any zap authentication LMQ_LOG(trace, "processing zap requests"); process_zap_requests(zap_auth); - workers_available = idle_workers.size() > 0 || workers.size() < max_workers; // recheck - idle_workers could have changed above - if (max_workers > 0 && workers_available) { - LMQ_LOG(trace, "processing incoming messages"); + // See if we can drain anything from the current queue before we potentially add to it + // below. + LMQ_LOG(trace, "processing queued jobs and messages"); + proxy_process_queue(); - // FIXME process any queued messages (i.e. that couldn't run but we had reserved tasks - // still free) first + LMQ_LOG(trace, "processing new incoming messages"); - // We round-robin connection queues for any pending messages (as long as we have enough - // waiting workers), but we don't want a lot of earlier connection requests to starve - // later request so each time through we continue from wherever we left off in the - // previous queue. + // We round-robin connections when pulling off pending messages one-by-one rather than + // pulling off all messages from one connection before moving to the next; thus in cases of + // contention we end up fairly distributing. + const size_t num_sockets = remotes.size() + listener.connected(); + std::queue queue_index; + for (size_t i = 0; i < num_sockets; i++) + queue_index.push(i); - const size_t num_sockets = remotes.size() + listener.connected(); - if (last_conn_index >= num_sockets) - last_conn_index = 0; - std::queue queue_index; - for (size_t i = 1; i <= num_sockets; i++) - queue_index.push((last_conn_index + i) % num_sockets); + for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) { + size_t i = queue_index.front(); + queue_index.pop(); + auto &sock = listener.connected() ? (i == 0 ? listener : remotes[i - 1].second) : remotes[i].second; - for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) { - size_t i = queue_index.front(); - queue_index.pop(); - auto &sock = listener.connected() ? (i == 0 ? listener : remotes[i - 1].second) : remotes[i].second; + if (!recv_message_parts(sock, std::back_inserter(parts), zmq::recv_flags::dontwait)) + continue; - if (!recv_message_parts(sock, std::back_inserter(parts), zmq::recv_flags::dontwait)) - continue; + // We only pull this one message now but then requeue the socket so that after we check + // all other sockets we come back to this one to check again. + queue_index.push(i); - last_conn_index = i; - queue_index.push(i); // We just read one, but there might be more messages waiting so requeue it at the end - - if (parts.empty()) { - LMQ_LOG(warn, "Ignoring empty (0-part) incoming message"); - continue; - } - - if (proxy_handle_builtin(last_conn_index, parts)) continue; - - proxy_to_worker(last_conn_index, parts); + if (parts.empty()) { + LMQ_LOG(warn, "Ignoring empty (0-part) incoming message"); + continue; } + + if (!proxy_handle_builtin(i, parts)) + proxy_to_worker(i, parts); } // Drop idle connections (if we haven't done it in a while) but *only* if we have some idle @@ -936,7 +926,7 @@ void LokiMQ::proxy_loop() { } } -std::pair LokiMQ::get_command(std::string& command) { +std::pair LokiMQ::get_command(std::string& command) { if (command.size() > MAX_CATEGORY_LENGTH + 1 + MAX_COMMAND_LENGTH) { LMQ_LOG(warn, "Invalid command '", command, "': command too long"); return {}; @@ -972,41 +962,179 @@ std::pair LokiMQ::get_c return {&catit->second, &callback_it->second}; } -std::pair &LokiMQ::proxy_lookup_peer(zmq::message_t& msg, ) { + +void LokiMQ::proxy_worker_message(std::vector& parts) { + // Process messages sent by workers + if (parts.size() != 2) { + LMQ_LOG(error, "Received send invalid ", parts.size(), "-part message"); + return; + } + auto route = view(parts[0]), cmd = view(parts[1]); + LMQ_LOG(trace, "worker message from ", route); + assert(route.size() >= 2 && route[0] == 'w' && route[1] >= '0' && route[1] <= '9'); + string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" + unsigned int worker_id = detail::extract_unsigned(worker_id_str); + if (!worker_id_str.empty() /* didn't consume everything */ || worker_id >= workers.size()) { + LMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command"); + return; + } + + auto& run = workers[worker_id]; + + LMQ_LOG(trace, "received ", cmd, " command from ", route); + if (cmd == "RAN") { + LMQ_LOG(debug, "Worker ", route, " finished ", run.command); + if (run.is_batch_job) { + assert(batch_jobs_active > 0); + batch_jobs_active--; + bool clear_job = false; + if (run.batch_jobno == -1) { + // Returned from the completion function + clear_job = true; + } else { + auto status = run.batch->job_finished(); + if (status == detail::BatchStatus::complete) { + batch_jobs.emplace(run.batch, -1); + } else if (status == detail::BatchStatus::done) { + clear_job = true; + } + } + + if (clear_job) { + batches.erase(run.batch); + delete run.batch; + run.batch = nullptr; + } + } else { + assert(run.cat->active_threads > 0); + run.cat->active_threads--; + } + if (max_workers == 0) { // Shutting down + LMQ_LOG(trace, "Telling worker ", route, " to quit"); + route_control(workers_socket, route, "QUIT"); + } else { + idle_workers.push_back(worker_id); + } + } else if (cmd == "QUITTING") { + workers[worker_id].thread.join(); + LMQ_LOG(debug, "Worker ", route, " exited normally"); + } else { + LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'"); + } +} + +decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(zmq::message_t& msg) { std::string pubkey; bool service_node; try { - extract_pubkey(parts.back(), pubkey, service_node); + extract_pubkey(msg, pubkey, service_node); } catch (...) { - LMQ_LOG(error, "Internal error: socket User-Id not set or invalid; dropping message"); - return; + LMQ_LOG(error, "Internal error: message metadata not set or invalid; dropping message"); + throw std::out_of_range("message pubkey metadata invalid"); } auto it = peers.find(pubkey); if (it == peers.end()) - it = peers.emplace(std::move(pubkey), peer_info{}); - auto &peer_info = peers[pubkey]; - peer_info.service_node |= service_node; + it = peers.emplace(std::move(pubkey), peer_info{}).first; + it->second.service_node |= service_node; + return it; +} +bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector& parts) { + (void) conn_index; // FIXME + auto cmd = view(parts.front()); + if (cmd == "BYE") { + auto pit = proxy_lookup_peer(parts.front()); + proxy_close_outgoing(pit); + return true; + } + else if (cmd == "FORBIDDEN" || cmd == "NOT_A_SERVICE_NODE") { + return true; // FIXME - ignore these? Log? + } + return false; +} + +LokiMQ::run_info& LokiMQ::get_idle_worker() { + if (idle_workers.empty()) { + size_t id = workers.size(); + assert(workers.capacity() > id); + workers.emplace_back(); + auto& r = workers.back(); + r.worker_id = id; + r.routing_id = "w" + std::to_string(id); + return r; + } + size_t id = idle_workers.back(); + idle_workers.pop_back(); + return workers[id]; +} + +void LokiMQ::set_batch_threads(unsigned int threads) { + if (proxy_thread.joinable()) + throw std::logic_error("Cannot change reserved batch threads after calling `start()`"); + batch_jobs_reserved = threads; +} + +void LokiMQ::set_general_threads(unsigned int threads) { + if (proxy_thread.joinable()) + throw std::logic_error("Cannot change general thread count after calling `start()`"); + general_workers = threads; +} + +LokiMQ::run_info& LokiMQ::run_info::operator=(pending_command&& pending) { + is_batch_job = false; + cat = &pending.cat; + command = std::move(pending.command); + pubkey = std::move(pending.pubkey); + service_node = pending.service_node; + data_parts = std::move(pending.data_parts); + callback = pending.callback; + return *this; +} +LokiMQ::run_info& LokiMQ::run_info::operator=(batch_job&& bj) { + is_batch_job = true; + batch_jobno = bj.second; + batch = bj.first; + return *this; } -bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector& parts) { - auto cmd = view(parts.front()); - if (cmd == "BYE") { - +void LokiMQ::proxy_run_worker(run_info& run) { + if (!run.thread.joinable()) + run.thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id}; + else + send_routed_message(workers_socket, run.routing_id, "RUN"); +} + + +void LokiMQ::proxy_process_queue() { + // First up: process any batch jobs; since these are internally they are given higher priority. + while (!batch_jobs.empty() && + (batch_jobs_active < batch_jobs_reserved || workers.size() - idle_workers.size() < general_workers)) { + proxy_run_worker(get_idle_worker() = std::move(batch_jobs.front())); + batch_jobs.pop(); + batch_jobs_active++; + } + + for (auto it = pending_commands.begin(); it != pending_commands.end() && active_workers() < max_workers; ) { + auto& pending = *it; + if (pending.cat.active_threads < pending.cat.reserved_threads + || active_workers() < general_workers) { + proxy_run_worker(get_idle_worker() = std::move(pending)); + pending.cat.queued--; + pending.cat.active_threads++; + assert(pending.cat.queued >= 0); + it = pending_commands.erase(it); + } else { + ++it; // no available general or reserved worker spots for this job right now + } } } void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& parts) { - std::string pubkey; - bool service_node; - try { - extract_pubkey(parts.back(), pubkey, service_node); - } catch (...) { - LMQ_LOG(error, "Internal error: socket User-Id not set or invalid; dropping message"); - return; - } + auto pit = proxy_lookup_peer(parts.back()); + string_view pubkey = pit->first; + auto& peer_info = pit->second; bool is_outgoing_conn = !listener.connected() || conn_index > 0; size_t command_part_index = is_outgoing_conn ? 0 : 1; @@ -1023,46 +1151,38 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par auto& category = *cat_call.first; - auto &peer_info = peers[pubkey]; - peer_info.service_node |= service_node; - if (!proxy_check_auth(pubkey, conn_index, peer_info, command, category, parts.back())) return; - bool can_work = false; - if (category.active_threads < category.reserved_threads) - // Our category still has reserved spots so we get to run even if it would exceed - // general_workers. - can_work = true; - else { - // We don't have a reserved spot, so the only way we get to run now is if there is an idle - // worker *and* we don't already have >= `general_workers` threads already doing things. - unsigned int working_threads = workers.size() - idle_workers.size(); - if (working_threads < general_workers) - can_work = true; - } + // Steal any data message parts + size_t data_part_index = command_part_index + 1; + std::vector data_parts; + data_parts.reserve(parts.size() - data_part_index); + for (auto it = parts.begin() + data_part_index; it != parts.end(); ++it) + data_parts.push_back(std::move(*it)); - if (!can_work) { - // We can't handle this now so queue it for later consideration when some workers free up. - LMQ_LOG(debug, "No available free workers, queuing task for later"); - // FIXME TODO + if (category.active_threads >= category.reserved_threads && active_workers() >= general_workers) { + // No free reserved or general spots, try to queue it for later + if (category.max_queue >= 0 && category.queued >= category.max_queue) { + LMQ_LOG(warn, "No space to queue incoming command ", command, "; already have ", category.queued, + "commands queued in that category (max ", category.max_queue, "); dropping message"); + return; + } + + LMQ_LOG(debug, "No available free workers, queuing ", command, " for later"); + pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, pubkey, peer_info.service_node); + category.queued++; return; } - size_t index; - std::unique_ptr new_run; - if (idle_workers.empty()) { - index = workers.size(); - assert(workers.capacity() > index); - new_run = std::make_unique(); - new_run->routing_id = "w" + std::to_string(index); - } else { - index = idle_workers.back(); - } - - auto& run = new_run ? *new_run : workers[index]; - run.pubkey = std::move(pubkey); - run.service_node = service_node; + auto& run = get_idle_worker(); + run.is_batch_job = false; + run.cat = &category; + run.command = std::move(command); + run.pubkey = pubkey; + run.service_node = peer_info.service_node; + run.data_parts = std::move(data_parts); + run.callback = cat_call.second; if (is_outgoing_conn) { peer_info.activity(); // outgoing connection activity, pump the activity timer @@ -1074,32 +1194,14 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par peer_info.incoming = route; } - LMQ_LOG(trace, __FILE__, __LINE__, "Invoking incoming ", command, " from ", - run.service_node ? "SN " : "non-SN ", to_hex(run.pubkey), " @ ", peer_address(parts.back()), - " on worker ", index); + LMQ_LOG(trace, "Forwarding incoming ", run.command, " from ", run.service_node ? "SN " : "non-SN ", + to_hex(run.pubkey), " @ ", peer_address(parts.back()), " to worker ", run.routing_id); - run.command = std::move(command); - run.callback = cat_call.second; - - // Steal any extra argument messages (after the command name) - run.message_parts.clear(); - for (size_t i = (command_part_index + 1); i < parts.size(); i++) - run.message_parts.push_back(std::move(parts[i])); - - if (new_run) { - // The thread processes the first job immediately upon startup, so just start it and don't - // send anything. - workers.push_back(std::move(run)); - workers.back().thread = std::thread{&LokiMQ::worker_thread, this, index}; - } else { - // The worker is idling, send it a RUN (prefixed with the route, for the ROUTER socket) - // to kick it into action - idle_workers.pop_back(); - send_routed_message(workers_socket, run.routing_id, "RUN"); - } + proxy_run_worker(run); + category.active_threads++; } -bool LokiMQ::proxy_check_auth(const std::string& pubkey, size_t conn_index, const peer_info& peer, const std::string& command, const category& cat, zmq::message_t& msg) { +bool LokiMQ::proxy_check_auth(string_view pubkey, size_t conn_index, const peer_info& peer, const std::string& command, const category& cat, zmq::message_t& msg) { bool is_outgoing_conn = !listener.connected() || conn_index > 0; std::string reply; if (peer.auth_level < cat.access.auth) { @@ -1138,7 +1240,7 @@ bool LokiMQ::proxy_check_auth(const std::string& pubkey, size_t conn_index, cons void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { std::vector frames; for (frames.reserve(7); recv_message_parts(zap_auth, std::back_inserter(frames), zmq::recv_flags::dontwait); frames.clear()) { - if (LogLevel::trace >= log_level()) { + if (log_level() >= LogLevel::trace) { std::ostringstream o; o << "Processing ZAP authentication request:"; for (size_t i = 0; i < frames.size(); i++) { @@ -1255,6 +1357,13 @@ void LokiMQ::connect(const std::string &pubkey, std::chrono::milliseconds keep_a detail::send_control(get_control_socket(), "CONNECT", bt_serialize({{"pubkey",pubkey}, {"keep-alive",keep_alive.count()}, {"hint",hint}})); } +inline void LokiMQ::job(std::function f) { + auto* b = new Batch; + b->add_job(std::move(f)); + auto* baseptr = static_cast(b); + detail::send_control(get_control_socket(), "BATCH", bt_serialize(reinterpret_cast(baseptr))); +} + } diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index e68d74c..6a9aee8 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -28,11 +28,11 @@ #pragma once -#include "zmq.hpp" #include #include #include #include +#include #include #include #include @@ -40,6 +40,7 @@ #include #include #include +#include #include "bt_serialize.h" #include "string_view.h" @@ -108,6 +109,10 @@ public: void reply(const std::string& command, Args&&... args); }; +// Forward declarations; see batch.h +namespace detail { class Batch; } +template class Batch; + /** The keep-alive time for a send() that results in a establishing a new outbound connection. To * use a longer keep-alive to a host call `connect()` first with the desired keep-alive time or pass @@ -214,7 +219,7 @@ public: private: - /// The lookup function that tells us where to connect to a peer + /// The lookup function that tells us where to connect to a peer, or empty if not found. SNRemoteAddress peer_lookup; /// Callback to see whether the incoming connection is allowed @@ -222,7 +227,7 @@ private: /// The log level; this is atomic but we use relaxed order to set and access it (so changing it /// might not be instantly visible on all threads, but that's okay). - std::atomic log_lvl; + std::atomic log_lvl{LogLevel::warn}; /// The callback to call with log messages Logger logger; @@ -313,26 +318,50 @@ private: /// indices of idle, active workers std::vector idle_workers; - /// Maximum number of general task workers, specified during construction - unsigned int general_workers; + /// Maximum number of general task workers, specified by g`/during construction + unsigned int general_workers = std::thread::hardware_concurrency(); /// Maximum number of possible worker threads we can have. This is calculated when starting, - /// and equals general_workers plus the sum of all categories' reserved threads counts. This is - /// also used to signal a shutdown; we set it to 0 when quitting. + /// and equals general_workers plus the sum of all categories' reserved threads counts plus the + /// reserved batch workers count. This is also used to signal a shutdown; we set it to 0 when + /// quitting. unsigned int max_workers; + /// Number of active workers + unsigned int active_workers() const { return workers.size() - idle_workers.size(); } + /// Worker thread loop void worker_thread(unsigned int index); /// Does the proxying work void proxy_loop(); + void proxy_worker_message(std::vector& parts); + + void proxy_process_queue(); + + /// Looks up a peers element given a zmq message (which has the pubkey and sn status metadata + /// set during initial connection authentication), creating a new peer element if required. + decltype(peers)::iterator proxy_lookup_peer(zmq::message_t& msg); + /// Handles built-in primitive commands in the proxy thread for things like "BYE" that have to /// be done in the proxy thread anyway (if we forwarded to a worker the worker would just have /// to send an instruction back to the proxy to do it). Returns true if one was handled, false /// to continue with sending to a worker. bool proxy_handle_builtin(size_t conn_index, std::vector& parts); + struct run_info; + /// Gets an idle worker's run_info and removes the worker from the idle worker list. If there + /// is no idle worker this creates a new `workers` element for a new worker (and so you should + /// only call this if new workers are permitted). Note that if this creates a new work info the + /// worker will *not* yet be started, so the caller must create the thread (in `.thread`) after + /// setting up the job if `.thread.joinable()` is false. + run_info& get_idle_worker(); + + /// Runs the worker; called after the `run` object has been set up. If the worker thread hasn't + /// been created then it is spawned; otherwise it is sent a RUN command. + void proxy_run_worker(run_info& run); + /// Sets up a job for a worker then signals the worker (or starts a worker thread) void proxy_to_worker(size_t conn_index, std::vector& parts); @@ -350,8 +379,7 @@ private: /// existing or a new one). std::pair proxy_connect(bt_dict&& data); - /// DISCONNECT command telling us to disconnect our remote connection to the given pubkey (if we - /// have one). + /// Called to disconnect our remote connection to the given pubkey (if we have one). void proxy_disconnect(const std::string& pubkey); /// SEND command. Does a connect first, if necessary. @@ -361,6 +389,18 @@ private: /// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open). void proxy_reply(bt_dict&& data); + /// Currently active batches. + std::unordered_set batches; + /// Individual batch jobs waiting to run + using batch_job = std::pair; + std::queue batch_jobs; + unsigned int batch_jobs_active = 0; + unsigned int batch_jobs_reserved = std::max((std::thread::hardware_concurrency() + 1) / 2, 1u); + + /// BATCH command. Called with a Batch (see lokimq/batch.h) object pointer for the proxy to + /// take over and queue batch jobs. + void proxy_batch(detail::Batch* batch); + /// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this is called with the /// zap auth socket to do non-blocking processing of any waiting authentication requests waiting /// on it to verify whether the connection is from a valid/allowed SN. @@ -383,8 +423,8 @@ private: std::unordered_map commands; unsigned int reserved_threads = 0; unsigned int active_threads = 0; - std::queue> pending; // FIXME - vector? int max_queue = 200; + int queued = 0; category(Access access, unsigned int reserved_threads, int max_queue) : access{access}, reserved_threads{reserved_threads}, max_queue{max_queue} {} @@ -401,14 +441,27 @@ private: /// Retrieve category and callback from a command name, including alias mapping. Warns on /// invalid commands and returns nullptrs. The command name will be updated in place if it is /// aliased to another command. - std::pair get_command(std::string& command); + std::pair get_command(std::string& command); /// Checks a peer's authentication level. Returns true if allowed, warns and returns false if /// not. - bool proxy_check_auth(const std::string& pubkey, size_t conn_index, const peer_info& peer, + bool proxy_check_auth(string_view pubkey, size_t conn_index, const peer_info& peer, const std::string& command, const category& cat, zmq::message_t& msg); - /// + /// Details for a pending command; such a command already has authenticated access and is just + /// waiting for a thread to become available to handle it. + struct pending_command { + category& cat; + std::string command; + std::vector data_parts; + const CommandCallback* callback; + std::string pubkey; + bool service_node; + + pending_command(category& cat, std::string command, std::vector data_parts, const CommandCallback* callback, std::string pubkey, bool service_node) + : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, callback{callback}, pubkey{std::move(pubkey)}, service_node{service_node} {} + }; + std::list pending_commands; /// End of proxy-specific members @@ -418,16 +471,33 @@ private: /// Structure that contains the data for a worker thread - both the thread itself, plus any /// transient data we are passing into the thread. struct run_info { + bool is_batch_job = false; + + // If is_batch_job is false then these will be set appropriate (if is_batch_job is true then + // these shouldn't be accessed and likely contain stale data). + category *cat; std::string command; std::string pubkey; bool service_node = false; - const CommandCallback* callback = nullptr; - std::vector message_parts; + std::vector data_parts; - private: - friend class LokiMQ; + // If is_batch_job true then these are set (if is_batch_job false then don't access these!): + int batch_jobno; // >= 0 for a job, -1 for the completion job + + union { + const CommandCallback* callback; // set if !is_batch_job + detail::Batch* batch; // set if is_batch_job + }; + + // These belong to the proxy thread and must not be accessed by a worker: std::thread thread; - std::string routing_id; + size_t worker_id; // The index in `workers` + std::string routing_id; // "w123" where 123 == worker_id + + /// Loads the run info with a pending command + run_info& operator=(pending_command&& pending); + /// Loads the run info with a batch job + run_info& operator=(batch_job&& bj); }; /// Data passed to workers for the RUN command. The proxy thread sets elements in this before /// sending RUN to a worker then the worker uses it to get call info, and only allocates it @@ -461,7 +531,7 @@ public: * connection string such as "tcp://1.2.3.4:23456" to which a connection should be established * to reach that service node. Note that this function is only called if there is no existing * connection to that service node, and that the function is never called for a connection to - * self (that uses an internal connection instead). + * self (that uses an internal connection instead). Should return empty for not found. * * @param allow_incoming is a callback that LokiMQ can use to determine whether an incoming * connection should be allowed at all and, if so, whether the connection is from a known @@ -472,11 +542,6 @@ public: * * @param log a function or callable object that writes a log message. If omitted then all log * messages are suppressed. - * - * @param general_workers the maximum number of worker threads to start for general tasks. - * These threads can be used for any command, and will be created (up to the limit) on demand. - * Note that individual categories with reserved threads can create threads in addition to the - * amount specified here. The default (0) means std::thread::hardware_concurrency(). */ LokiMQ( std::string pubkey, std::string privkey, @@ -484,8 +549,7 @@ public: std::vector bind, SNRemoteAddress peer_lookup, AllowFunc allow_connection, - Logger logger = [](LogLevel, const char*, int, std::string) { }, - unsigned int general_workers = 0); + Logger logger = [](LogLevel, const char*, int, std::string) { }); /** * Destructor; instructs the proxy to quit. The proxy tells all workers to quit, waits for them @@ -521,10 +585,11 @@ public: * that category. * * @param max_queue is the maximum number of incoming messages in this category that we will - * queue up when waiting for a worker to become available for this category. Once the queue - * for a category exceeds this many incoming messages then new messages will be dropped until - * some messages are processed off the queue. -1 means unlimited, 0 means we will just drop - * messages for this category when no workers are available. + * queue up when waiting for a worker to become available for this category. Once the queue for + * a category exceeds this many incoming messages then new messages will be dropped until some + * messages are processed off the queue. -1 means unlimited, 0 means we will never queue (which + * means just dropping messages for this category if no workers are available to instantly + * handle the request). */ void add_category(std::string name, Access access_level, unsigned int reserved_threads = 0, int max_queue = 200); @@ -556,6 +621,26 @@ public: */ void add_command_alias(std::string from, std::string to); + /** + * Sets the number of worker threads reserved for batch jobs. If not called this defaults to + * half the number of hardware threads available (rounded up). This works exactly like reserved_threads + * for a category, but allows to batch jobs. See category for details. + * + * Cannot be called after start()ing the LokiMQ instance. + */ + void set_batch_threads(unsigned int threads); + + /** + * Sets the number of general worker threads. This is the target number of threads to run that + * we generally try not to exceed. These threads can be used for any command, and will be + * created (up to the limit) on demand. Note that individual categories (or batch jobs) with + * reserved threads can create threads in addition to the amount specified here if necessary to + * fulfill the reserved threads count for the category. + * + * Cannot be called after start()ing the LokiMQ instance. + */ + void set_general_threads(unsigned int threads); + /** * Finish starting up: binds to the bind locations given in the constructor and launches the * proxy thread to handle message dispatching between remote nodes and worker threads. @@ -636,6 +721,20 @@ public: /// this returns the generated keys. const std::string& get_pubkey() const { return pubkey; } const std::string& get_privkey() const { return privkey; } + + /** + * Batches a set of jobs to be executed by workers, optionally followed by a completion function. + * + * Must include lokimq/batch.h to use. + */ + template + void batch(Batch&& batch); + + /** + * Queues a single job to be executed with no return value. This is a shortcut for creating and + * submitting a single-job, no-completion batch. + */ + void job(std::function f); }; /// Namespace for options to the send() method @@ -766,6 +865,17 @@ void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) { logger(lvl, file, line, os.str()); } +std::ostream &operator<<(std::ostream &os, LogLevel lvl) { + os << (lvl == LogLevel::trace ? "trace" : + lvl == LogLevel::debug ? "debug" : + lvl == LogLevel::info ? "info" : + lvl == LogLevel::warn ? "warn" : + lvl == LogLevel::error ? "ERROR" : + lvl == LogLevel::fatal ? "FATAL" : + "unknown"); + return os; +} + } // vim:sw=4:et