Merge pull request #76 from jagerman/optimizations

Optimizations
This commit is contained in:
Jason Rhinelander 2022-05-30 10:51:40 -03:00 committed by GitHub
commit c4b7aa9b23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 152 additions and 80 deletions

View File

@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liboxenmq
VERSION 1.2.11
VERSION 1.2.12
LANGUAGES CXX C)
include(GNUInstallDirs)
@ -140,8 +140,12 @@ if(WARNINGS_AS_ERRORS)
target_compile_options(oxenmq PRIVATE -Werror)
endif()
target_compile_features(oxenmq PUBLIC cxx_std_17)
set_target_properties(oxenmq PROPERTIES POSITION_INDEPENDENT_CODE ON)
set_target_properties(oxenmq PROPERTIES
POSITION_INDEPENDENT_CODE ON
CXX_STANDARD 17
CXX_STANDARD_REQUIRED ON
CXX_EXTENSIONS OFF
)
function(link_dep_libs target linktype libdirs)
foreach(lib ${ARGN})

@ -1 +1 @@
Subproject commit a0912ab4bf3b5e83b42715eff6f632c8912b21e4
Subproject commit 79193e58fb26624d40cd2e95156f78160f2b9b3e

View File

@ -266,6 +266,52 @@ private:
}
};
// Similar to Batch<void>, but doesn't support a completion function and only handles a single task.
class Job final : private detail::Batch {
friend class OxenMQ;
public:
/// Constructs the Job to run a single task. Takes any callable invokable with no arguments and
/// having no return value. The task will be scheduled and run when the next worker thread is
/// available. Any exceptions thrown by the job will be caught and squelched (the exception
/// terminates/completes the job).
explicit Job(std::function<void()> f, std::optional<TaggedThreadID> thread = std::nullopt)
: Job{std::move(f), thread ? thread->_id : 0}
{
if (thread && thread->_id == -1)
// There are some special case internal jobs where we allow this, but they use the
// private ctor below that doesn't have this check.
throw std::logic_error{"Cannot add a proxy thread job -- this makes no sense"};
}
// movable
Job(Job&&) = default;
Job &operator=(Job&&) = default;
// non-copyable
Job(const Job&) = delete;
Job &operator=(const Job&) = delete;
private:
explicit Job(std::function<void()> f, int thread_id)
: job{std::move(f), thread_id} {}
std::pair<std::function<void()>, int> job;
bool done = false;
std::pair<size_t, bool> size() const override { return {1, job.second != 0}; }
std::vector<int> threads() const override { return {job.second}; }
void run_job(const int /*i*/) override {
try { job.first(); }
catch (...) {}
}
detail::BatchStatus job_finished() override { return {detail::BatchState::done, 0}; }
void job_completion() override {} // Never called because we return ::done (not ::complete) above.
};
template <typename R>
void OxenMQ::batch(Batch<R>&& batch) {

View File

@ -5,21 +5,20 @@
namespace oxenmq {
void OxenMQ::proxy_batch(detail::Batch* batch) {
batches.insert(batch);
const auto [jobs, tagged_threads] = batch->size();
OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
if (!tagged_threads) {
for (size_t i = 0; i < jobs; i++)
batch_jobs.emplace(batch, i);
batch_jobs.emplace_back(batch, i);
} else {
// Some (or all) jobs have a specific thread target so queue any such jobs in the tagged
// worker queue.
auto threads = batch->threads();
for (size_t i = 0; i < jobs; i++) {
auto& jobs = threads[i] > 0
? std::get<std::queue<batch_job>>(tagged_workers[threads[i] - 1])
? std::get<batch_queue>(tagged_workers[threads[i] - 1])
: batch_jobs;
jobs.emplace(batch, i);
jobs.emplace_back(batch, i);
}
}
@ -29,25 +28,22 @@ void OxenMQ::proxy_batch(detail::Batch* batch) {
void OxenMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread) {
if (thread && thread->_id == -1)
throw std::logic_error{"job() cannot be used to queue an in-proxy job"};
auto* b = new Batch<void>;
b->add_job(std::move(f), thread);
auto* baseptr = static_cast<detail::Batch*>(b);
auto* j = new Job(std::move(f), thread);
auto* baseptr = static_cast<detail::Batch*>(j);
detail::send_control(get_control_socket(), "BATCH", oxenc::bt_serialize(reinterpret_cast<uintptr_t>(baseptr)));
}
void OxenMQ::proxy_schedule_reply_job(std::function<void()> f) {
auto* b = new Batch<void>;
b->add_job(std::move(f));
batches.insert(b);
reply_jobs.emplace(static_cast<detail::Batch*>(b), 0);
auto* j = new Job(std::move(f));
reply_jobs.emplace_back(static_cast<detail::Batch*>(j), 0);
proxy_skip_one_poll = true;
}
void OxenMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
void OxenMQ::proxy_run_batch_jobs(batch_queue& jobs, const int reserved, int& active, bool reply) {
while (!jobs.empty() && active_workers() < max_workers &&
(active < reserved || active_workers() < general_workers)) {
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
jobs.pop();
jobs.pop_front();
active++;
}
}
@ -98,11 +94,12 @@ void OxenMQ::_queue_timer_job(int timer_id) {
return;
}
auto* b = new Batch<void>;
b->add_job(func, thread);
detail::Batch* b;
if (squelch) {
auto* bv = new Batch<void>;
bv->add_job(func, thread);
running = true;
b->completion([this,timer_id](auto results) {
bv->completion([this,timer_id](auto results) {
try { results[0].get(); }
catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
@ -110,14 +107,16 @@ void OxenMQ::_queue_timer_job(int timer_id) {
if (it != timer_jobs.end())
it->second.running = false;
}, OxenMQ::run_in_proxy);
b = bv;
} else {
b = new Job(func, thread);
}
batches.insert(b);
OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
assert(b->size() == std::make_pair(size_t{1}, thread > 0));
auto& queue = thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1])
? std::get<batch_queue>(tagged_workers[thread - 1])
: batch_jobs;
queue.emplace(static_cast<detail::Batch*>(b), 0);
queue.emplace_back(static_cast<detail::Batch*>(b), 0);
}
void OxenMQ::add_timer(TimerID& timer, std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
@ -171,8 +170,9 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function<void()>
auto& [run, busy, queue] = tagged_workers.emplace_back();
busy = false;
run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs)
run.worker_routing_id = "t" + std::to_string(run.worker_id);
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
run.worker_routing_name = "t" + std::to_string(run.worker_id);
run.worker_routing_id = "t" + std::string{reinterpret_cast<const char*>(&run.worker_id), sizeof(run.worker_id)};
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_name);
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)};

View File

@ -86,6 +86,21 @@ inline bool recv_message_parts(zmq::socket_t& sock, std::vector<zmq::message_t>&
return true;
}
// Same as above, but using a fixed sized array; this is only used for internal jobs (e.g. control
// messages) where we know the message parts should never exceed a given size (this function does
// not bounds check except in debug builds). Returns the number of message parts received, or 0 on
// read error.
template <size_t N>
inline size_t recv_message_parts(zmq::socket_t& sock, std::array<zmq::message_t, N>& parts, const zmq::recv_flags flags = zmq::recv_flags::none) {
for (size_t count = 0; ; count++) {
assert(count < N);
if (!sock.recv(parts[count], flags))
return 0;
if (!parts[count].more())
return count + 1;
}
}
inline const char* peer_address(zmq::message_t& msg) {
try { return msg.gets("Peer-Address"); } catch (...) {}
return "(unknown)";

View File

@ -45,6 +45,7 @@
#include <cassert>
#include <cstdint>
#include <future>
#include <variant>
#include "zmq.hpp"
#include "address.h"
#include <oxenc/bt_serialize.h>
@ -106,6 +107,7 @@ private:
explicit constexpr TaggedThreadID(int id) : _id{id} {}
friend class OxenMQ;
template <typename R> friend class Batch;
friend class Job;
};
/// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only
@ -455,8 +457,9 @@ private:
/// Router socket to reach internal worker threads from proxy
zmq::socket_t workers_socket{context, zmq::socket_type::router};
/// indices of idle, active workers
/// indices of idle, active workers; note that this vector is usually oversized
std::vector<unsigned int> idle_workers;
size_t idle_worker_count = 0; // Actual # elements of idle_workers in use
/// Maximum number of general task workers, specified by set_general_threads()
int general_workers = std::max<int>(1, std::thread::hardware_concurrency());
@ -468,7 +471,7 @@ private:
int max_workers;
/// Number of active workers
int active_workers() const { return workers.size() - idle_workers.size(); }
int active_workers() const { return workers.size() - idle_worker_count; }
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
@ -483,7 +486,9 @@ private:
void proxy_conn_cleanup();
void proxy_worker_message(std::vector<zmq::message_t>& parts);
using control_message_array = std::array<zmq::message_t, 3>;
void proxy_worker_message(control_message_array& parts, size_t len);
void proxy_process_queue();
@ -575,18 +580,17 @@ private:
/// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open).
void proxy_reply(oxenc::bt_dict_consumer data);
/// Currently active batch/reply jobs; this is the container that owns the Batch instances
std::unordered_set<detail::Batch*> batches;
/// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the
/// completion job
using batch_job = std::pair<detail::Batch*, int>;
std::queue<batch_job> batch_jobs, reply_jobs;
using batch_queue = std::deque<batch_job>;
batch_queue batch_jobs, reply_jobs;
int batch_jobs_active = 0;
int reply_jobs_active = 0;
int batch_jobs_reserved = -1;
int reply_jobs_reserved = -1;
/// Runs any queued batch jobs
void proxy_run_batch_jobs(std::queue<batch_job>& jobs, int reserved, int& active, bool reply);
void proxy_run_batch_jobs(batch_queue& jobs, int reserved, int& active, bool reply);
/// BATCH command. Called with a Batch<R> (see oxenmq/batch.h) object pointer for the proxy to
/// take over and queue batch jobs.
@ -608,7 +612,7 @@ private:
void process_zap_requests();
/// Handles a control message from some outer thread to the proxy
void proxy_control_message(std::vector<zmq::message_t>& parts);
void proxy_control_message(control_message_array& parts, size_t len);
/// Closing any idle connections that have outlived their idle time. Note that this only
/// affects outgoing connections; incomings connections are the responsibility of the other end.
@ -744,8 +748,9 @@ private:
// These belong to the proxy thread and must not be accessed by a worker:
std::thread worker_thread;
size_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
std::string worker_routing_id; // "w123" where 123 == worker_id; "n123" for tagged threads.
uint32_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
std::string worker_routing_id; // "wXXXX" where XXXX is the raw bytes of worker_id, or tXXXX for tagged threads.
std::string worker_routing_name; // "w123" or "t123" -- human readable version of worker_routing_id
/// Loads the run info with an incoming command
run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote,
@ -769,7 +774,7 @@ private:
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
/// is whether the worker is currently busy (true) or available (false).
std::vector<std::tuple<run_info, bool, std::queue<batch_job>>> tagged_workers;
std::vector<std::tuple<run_info, bool, batch_queue>> tagged_workers;
public:
/**

View File

@ -271,14 +271,14 @@ void OxenMQ::proxy_reply(oxenc::bt_dict_consumer data) {
}
}
void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t len) {
// We throw an uncaught exception here because we only generate control messages internally in
// oxenmq code: if one of these condition fail it's a oxenmq bug.
if (parts.size() < 2)
if (len < 2)
throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message");
auto route = view(parts[0]), cmd = view(parts[1]);
OMQ_TRACE("control message: ", cmd);
if (parts.size() == 3) {
if (len == 3) {
OMQ_TRACE("...: ", parts[2]);
auto data = view(parts[2]);
if (cmd == "SEND") {
@ -315,7 +315,7 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
bind.push_back(std::move(b));
return;
}
} else if (parts.size() == 2) {
} else if (len == 2) {
if (cmd == "START") {
// Command send by the owning thread during startup; we send back a simple READY reply to
// let it know we are running.
@ -325,9 +325,9 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
// close workers as they come back to READY status, and then close external
// connections once all workers are done.
max_workers = 0;
for (const auto &route : idle_workers)
route_control(workers_socket, workers[route].worker_routing_id, "QUIT");
idle_workers.clear();
for (size_t i = 0; i < idle_worker_count; i++)
route_control(workers_socket, workers[idle_workers[i]].worker_routing_id, "QUIT");
idle_worker_count = 0;
for (auto& [run, busy, queue] : tagged_workers)
if (!busy)
route_control(workers_socket, run.worker_routing_id, "QUIT");
@ -335,7 +335,7 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
}
}
throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " +
std::string{cmd} + " (" + std::to_string(parts.size()) + ")");
std::string{cmd} + " (" + std::to_string(len) + ")");
}
bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
@ -404,6 +404,7 @@ void OxenMQ::proxy_loop_init() {
}
workers.reserve(max_workers);
idle_workers.resize(max_workers);
if (!workers.empty())
throw std::logic_error("Internal error: proxy thread started with active worker threads");
@ -482,7 +483,7 @@ void OxenMQ::proxy_loop_init() {
}
for (auto&w : tagged_workers) {
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_name, " to finish startup");
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
}
}
@ -497,6 +498,10 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
}
startup.set_value();
// Fixed array used for worker and control messages: these are never longer than 3 parts:
std::array<zmq::message_t, 3> control_parts;
// General vector for handling incoming messages:
std::vector<zmq::message_t> parts;
while (true) {
@ -529,13 +534,13 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) {
proxy_control_message(parts);
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
proxy_control_message(control_parts, len);
}
OMQ_TRACE("processing worker messages");
for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) {
proxy_worker_message(parts);
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
proxy_worker_message(control_parts, len);
}
OMQ_TRACE("processing timers");
@ -731,7 +736,7 @@ void OxenMQ::proxy_process_queue() {
if (!busy && !queue.empty()) {
busy = true;
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
queue.pop();
queue.pop_front();
}
}

View File

@ -48,10 +48,11 @@ bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector<zmq::message_
}
void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
std::string routing_id = (tagged ? "t" : "w") +
std::string(reinterpret_cast<const char*>(&index), sizeof(index)); // for routing
std::string worker_id{tagged ? *tagged : "w" + std::to_string(index)}; // for debug
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id);
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + worker_id);
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
if (thread_name.size() > 15) thread_name.resize(15);
pthread_setname_np(pthread_self(), thread_name.c_str());
@ -161,39 +162,37 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
OxenMQ::run_info& OxenMQ::get_idle_worker() {
if (idle_workers.empty()) {
size_t id = workers.size();
assert(workers.capacity() > id);
if (idle_worker_count == 0) {
uint32_t id = workers.size();
workers.emplace_back();
auto& r = workers.back();
r.worker_id = id;
r.worker_routing_id = "w" + std::to_string(id);
r.worker_routing_id = "w" + std::string(reinterpret_cast<const char*>(&id), sizeof(id));
r.worker_routing_name = "w" + std::to_string(id);
return r;
}
size_t id = idle_workers.back();
idle_workers.pop_back();
size_t id = idle_workers[--idle_worker_count];
return workers[id];
}
void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t len) {
// Process messages sent by workers
if (parts.size() != 2) {
OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message");
if (len != 2) {
OMQ_LOG(error, "Received send invalid ", len, "-part message");
return;
}
auto route = view(parts[0]), cmd = view(parts[1]);
OMQ_TRACE("worker message from ", route);
assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9');
if (route.size() != 5 || (route[0] != 'w' && route[0] != 't')) {
OMQ_LOG(error, "Received malformed worker id in worker message; unable to process worker command");
return;
}
bool tagged_worker = route[0] == 't';
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t")
unsigned int worker_id = oxenc::detail::extract_unsigned(worker_id_str);
if (!worker_id_str.empty() /* didn't consume everything */ ||
(tagged_worker
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
: worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1
)
) {
OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
uint32_t worker_id;
std::memcpy(&worker_id, route.data() + 1, 4);
if (tagged_worker
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
: worker_id >= workers.size()) { // regular worker ids are indexed from 0 to N-1
OMQ_LOG(error, "Received invalid worker id w" + std::to_string(worker_id) + " in worker message; unable to process worker command");
return;
}
@ -233,11 +232,11 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
} else {
auto& jobs =
thread > 0
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1]) // run in tagged thread
? std::get<batch_queue>(tagged_workers[thread - 1]) // run in tagged thread
: run.is_reply_job
? reply_jobs
: batch_jobs;
jobs.emplace(batch, -1);
jobs.emplace_back(batch, -1);
}
} else if (state == detail::BatchState::done) {
// No completion job
@ -247,9 +246,7 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
}
if (clear_job) {
batches.erase(batch);
delete batch;
run.to_run = static_cast<detail::Batch*>(nullptr);
}
} else {
assert(run.cat->active_threads > 0);
@ -259,7 +256,7 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
OMQ_TRACE("Telling worker ", route, " to quit");
route_control(workers_socket, route, "QUIT");
} else if (!tagged_worker) {
idle_workers.push_back(worker_id);
idle_workers[idle_worker_count++] = worker_id;
}
} else if (cmd == "QUITTING"sv) {
run.worker_thread.join();
@ -381,7 +378,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
peer->activity(); // outgoing connection activity, pump the activity timer
OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
" to worker ", run.worker_routing_id);
" to worker ", run.worker_routing_name);
proxy_run_worker(run);
category.active_threads++;
@ -412,7 +409,7 @@ void OxenMQ::proxy_inject_task(injected_task task) {
}
auto& run = get_idle_worker();
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id);
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_name);
run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback));
proxy_run_worker(run);