mirror of https://github.com/oxen-io/oxen-mq.git
Merge remote-tracking branch 'origin/stable' into ubuntu/impish
This commit is contained in:
commit
2e0df28595
|
@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
|
||||||
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
|
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
|
||||||
|
|
||||||
project(liboxenmq
|
project(liboxenmq
|
||||||
VERSION 1.2.11
|
VERSION 1.2.12
|
||||||
LANGUAGES CXX C)
|
LANGUAGES CXX C)
|
||||||
|
|
||||||
include(GNUInstallDirs)
|
include(GNUInstallDirs)
|
||||||
|
@ -76,7 +76,10 @@ elseif(BUILD_SHARED_LIBS)
|
||||||
pkg_check_modules(oxenc liboxenc IMPORTED_TARGET)
|
pkg_check_modules(oxenc liboxenc IMPORTED_TARGET)
|
||||||
|
|
||||||
if(oxenc_FOUND)
|
if(oxenc_FOUND)
|
||||||
target_link_libraries(oxenmq PUBLIC PkgConfig::oxenc)
|
# Work around cmake bug 22180 (PkgConfig::tgt not set if no flags needed)
|
||||||
|
if(TARGET PkgConfig::oxenc OR CMAKE_VERSION VERSION_GREATER_EQUAL "3.21")
|
||||||
|
target_link_libraries(oxenmq PUBLIC PkgConfig::oxenc)
|
||||||
|
endif()
|
||||||
else()
|
else()
|
||||||
add_subdirectory(oxen-encoding)
|
add_subdirectory(oxen-encoding)
|
||||||
target_link_libraries(oxenmq PUBLIC oxenc)
|
target_link_libraries(oxenmq PUBLIC oxenc)
|
||||||
|
@ -137,8 +140,12 @@ if(WARNINGS_AS_ERRORS)
|
||||||
target_compile_options(oxenmq PRIVATE -Werror)
|
target_compile_options(oxenmq PRIVATE -Werror)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
target_compile_features(oxenmq PUBLIC cxx_std_17)
|
set_target_properties(oxenmq PROPERTIES
|
||||||
set_target_properties(oxenmq PROPERTIES POSITION_INDEPENDENT_CODE ON)
|
POSITION_INDEPENDENT_CODE ON
|
||||||
|
CXX_STANDARD 17
|
||||||
|
CXX_STANDARD_REQUIRED ON
|
||||||
|
CXX_EXTENSIONS OFF
|
||||||
|
)
|
||||||
|
|
||||||
function(link_dep_libs target linktype libdirs)
|
function(link_dep_libs target linktype libdirs)
|
||||||
foreach(lib ${ARGN})
|
foreach(lib ${ARGN})
|
||||||
|
|
|
@ -22,11 +22,11 @@ ExternalProject_Add(libzmq_external
|
||||||
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
|
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
|
||||||
-DWITH_LIBSODIUM=ON -DZMQ_BUILD_TESTS=OFF -DWITH_PERF_TOOL=OFF -DENABLE_DRAFTS=OFF
|
-DWITH_LIBSODIUM=ON -DZMQ_BUILD_TESTS=OFF -DWITH_PERF_TOOL=OFF -DENABLE_DRAFTS=OFF
|
||||||
-DBUILD_SHARED=OFF -DBUILD_STATIC=ON -DWITH_DOC=OFF -DCMAKE_INSTALL_PREFIX=${LIBZMQ_PREFIX}
|
-DBUILD_SHARED=OFF -DBUILD_STATIC=ON -DWITH_DOC=OFF -DCMAKE_INSTALL_PREFIX=${LIBZMQ_PREFIX}
|
||||||
BUILD_BYPRODUCTS ${LIBZMQ_PREFIX}/lib/libzmq.a
|
BUILD_BYPRODUCTS ${LIBZMQ_PREFIX}/${CMAKE_INSTALL_LIBDIR}/libzmq.a
|
||||||
)
|
)
|
||||||
|
|
||||||
add_library(libzmq_vendor STATIC IMPORTED GLOBAL)
|
add_library(libzmq_vendor STATIC IMPORTED GLOBAL)
|
||||||
add_dependencies(libzmq_vendor libzmq_external)
|
add_dependencies(libzmq_vendor libzmq_external)
|
||||||
set_target_properties(libzmq_vendor PROPERTIES
|
set_target_properties(libzmq_vendor PROPERTIES
|
||||||
INTERFACE_INCLUDE_DIRECTORIES ${LIBZMQ_PREFIX}/include
|
INTERFACE_INCLUDE_DIRECTORIES ${LIBZMQ_PREFIX}/include
|
||||||
IMPORTED_LOCATION ${LIBZMQ_PREFIX}/lib/libzmq.a)
|
IMPORTED_LOCATION ${LIBZMQ_PREFIX}/${CMAKE_INSTALL_LIBDIR}/libzmq.a)
|
||||||
|
|
Binary file not shown.
|
@ -1 +1 @@
|
||||||
Subproject commit a0912ab4bf3b5e83b42715eff6f632c8912b21e4
|
Subproject commit 79193e58fb26624d40cd2e95156f78160f2b9b3e
|
|
@ -266,6 +266,52 @@ private:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Similar to Batch<void>, but doesn't support a completion function and only handles a single task.
|
||||||
|
class Job final : private detail::Batch {
|
||||||
|
friend class OxenMQ;
|
||||||
|
public:
|
||||||
|
/// Constructs the Job to run a single task. Takes any callable invokable with no arguments and
|
||||||
|
/// having no return value. The task will be scheduled and run when the next worker thread is
|
||||||
|
/// available. Any exceptions thrown by the job will be caught and squelched (the exception
|
||||||
|
/// terminates/completes the job).
|
||||||
|
|
||||||
|
explicit Job(std::function<void()> f, std::optional<TaggedThreadID> thread = std::nullopt)
|
||||||
|
: Job{std::move(f), thread ? thread->_id : 0}
|
||||||
|
{
|
||||||
|
if (thread && thread->_id == -1)
|
||||||
|
// There are some special case internal jobs where we allow this, but they use the
|
||||||
|
// private ctor below that doesn't have this check.
|
||||||
|
throw std::logic_error{"Cannot add a proxy thread job -- this makes no sense"};
|
||||||
|
}
|
||||||
|
|
||||||
|
// movable
|
||||||
|
Job(Job&&) = default;
|
||||||
|
Job &operator=(Job&&) = default;
|
||||||
|
|
||||||
|
// non-copyable
|
||||||
|
Job(const Job&) = delete;
|
||||||
|
Job &operator=(const Job&) = delete;
|
||||||
|
|
||||||
|
private:
|
||||||
|
explicit Job(std::function<void()> f, int thread_id)
|
||||||
|
: job{std::move(f), thread_id} {}
|
||||||
|
|
||||||
|
std::pair<std::function<void()>, int> job;
|
||||||
|
bool done = false;
|
||||||
|
|
||||||
|
std::pair<size_t, bool> size() const override { return {1, job.second != 0}; }
|
||||||
|
std::vector<int> threads() const override { return {job.second}; }
|
||||||
|
|
||||||
|
void run_job(const int /*i*/) override {
|
||||||
|
try { job.first(); }
|
||||||
|
catch (...) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
detail::BatchStatus job_finished() override { return {detail::BatchState::done, 0}; }
|
||||||
|
|
||||||
|
void job_completion() override {} // Never called because we return ::done (not ::complete) above.
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
template <typename R>
|
template <typename R>
|
||||||
void OxenMQ::batch(Batch<R>&& batch) {
|
void OxenMQ::batch(Batch<R>&& batch) {
|
||||||
|
|
|
@ -5,21 +5,20 @@
|
||||||
namespace oxenmq {
|
namespace oxenmq {
|
||||||
|
|
||||||
void OxenMQ::proxy_batch(detail::Batch* batch) {
|
void OxenMQ::proxy_batch(detail::Batch* batch) {
|
||||||
batches.insert(batch);
|
|
||||||
const auto [jobs, tagged_threads] = batch->size();
|
const auto [jobs, tagged_threads] = batch->size();
|
||||||
OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
|
OMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
|
||||||
if (!tagged_threads) {
|
if (!tagged_threads) {
|
||||||
for (size_t i = 0; i < jobs; i++)
|
for (size_t i = 0; i < jobs; i++)
|
||||||
batch_jobs.emplace(batch, i);
|
batch_jobs.emplace_back(batch, i);
|
||||||
} else {
|
} else {
|
||||||
// Some (or all) jobs have a specific thread target so queue any such jobs in the tagged
|
// Some (or all) jobs have a specific thread target so queue any such jobs in the tagged
|
||||||
// worker queue.
|
// worker queue.
|
||||||
auto threads = batch->threads();
|
auto threads = batch->threads();
|
||||||
for (size_t i = 0; i < jobs; i++) {
|
for (size_t i = 0; i < jobs; i++) {
|
||||||
auto& jobs = threads[i] > 0
|
auto& jobs = threads[i] > 0
|
||||||
? std::get<std::queue<batch_job>>(tagged_workers[threads[i] - 1])
|
? std::get<batch_queue>(tagged_workers[threads[i] - 1])
|
||||||
: batch_jobs;
|
: batch_jobs;
|
||||||
jobs.emplace(batch, i);
|
jobs.emplace_back(batch, i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,25 +28,22 @@ void OxenMQ::proxy_batch(detail::Batch* batch) {
|
||||||
void OxenMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread) {
|
void OxenMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread) {
|
||||||
if (thread && thread->_id == -1)
|
if (thread && thread->_id == -1)
|
||||||
throw std::logic_error{"job() cannot be used to queue an in-proxy job"};
|
throw std::logic_error{"job() cannot be used to queue an in-proxy job"};
|
||||||
auto* b = new Batch<void>;
|
auto* j = new Job(std::move(f), thread);
|
||||||
b->add_job(std::move(f), thread);
|
auto* baseptr = static_cast<detail::Batch*>(j);
|
||||||
auto* baseptr = static_cast<detail::Batch*>(b);
|
|
||||||
detail::send_control(get_control_socket(), "BATCH", oxenc::bt_serialize(reinterpret_cast<uintptr_t>(baseptr)));
|
detail::send_control(get_control_socket(), "BATCH", oxenc::bt_serialize(reinterpret_cast<uintptr_t>(baseptr)));
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::proxy_schedule_reply_job(std::function<void()> f) {
|
void OxenMQ::proxy_schedule_reply_job(std::function<void()> f) {
|
||||||
auto* b = new Batch<void>;
|
auto* j = new Job(std::move(f));
|
||||||
b->add_job(std::move(f));
|
reply_jobs.emplace_back(static_cast<detail::Batch*>(j), 0);
|
||||||
batches.insert(b);
|
|
||||||
reply_jobs.emplace(static_cast<detail::Batch*>(b), 0);
|
|
||||||
proxy_skip_one_poll = true;
|
proxy_skip_one_poll = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
|
void OxenMQ::proxy_run_batch_jobs(batch_queue& jobs, const int reserved, int& active, bool reply) {
|
||||||
while (!jobs.empty() && active_workers() < max_workers &&
|
while (!jobs.empty() && active_workers() < max_workers &&
|
||||||
(active < reserved || active_workers() < general_workers)) {
|
(active < reserved || active_workers() < general_workers)) {
|
||||||
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
|
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
|
||||||
jobs.pop();
|
jobs.pop_front();
|
||||||
active++;
|
active++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,11 +94,12 @@ void OxenMQ::_queue_timer_job(int timer_id) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto* b = new Batch<void>;
|
detail::Batch* b;
|
||||||
b->add_job(func, thread);
|
|
||||||
if (squelch) {
|
if (squelch) {
|
||||||
|
auto* bv = new Batch<void>;
|
||||||
|
bv->add_job(func, thread);
|
||||||
running = true;
|
running = true;
|
||||||
b->completion([this,timer_id](auto results) {
|
bv->completion([this,timer_id](auto results) {
|
||||||
try { results[0].get(); }
|
try { results[0].get(); }
|
||||||
catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
|
catch (const std::exception &e) { OMQ_LOG(warn, "timer job ", timer_id, " raised an exception: ", e.what()); }
|
||||||
catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
|
catch (...) { OMQ_LOG(warn, "timer job ", timer_id, " raised a non-std exception"); }
|
||||||
|
@ -110,14 +107,16 @@ void OxenMQ::_queue_timer_job(int timer_id) {
|
||||||
if (it != timer_jobs.end())
|
if (it != timer_jobs.end())
|
||||||
it->second.running = false;
|
it->second.running = false;
|
||||||
}, OxenMQ::run_in_proxy);
|
}, OxenMQ::run_in_proxy);
|
||||||
|
b = bv;
|
||||||
|
} else {
|
||||||
|
b = new Job(func, thread);
|
||||||
}
|
}
|
||||||
batches.insert(b);
|
|
||||||
OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
|
OMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
|
||||||
assert(b->size() == std::make_pair(size_t{1}, thread > 0));
|
assert(b->size() == std::make_pair(size_t{1}, thread > 0));
|
||||||
auto& queue = thread > 0
|
auto& queue = thread > 0
|
||||||
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1])
|
? std::get<batch_queue>(tagged_workers[thread - 1])
|
||||||
: batch_jobs;
|
: batch_jobs;
|
||||||
queue.emplace(static_cast<detail::Batch*>(b), 0);
|
queue.emplace_back(static_cast<detail::Batch*>(b), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::add_timer(TimerID& timer, std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
|
void OxenMQ::add_timer(TimerID& timer, std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
|
||||||
|
@ -171,8 +170,9 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function<void()>
|
||||||
auto& [run, busy, queue] = tagged_workers.emplace_back();
|
auto& [run, busy, queue] = tagged_workers.emplace_back();
|
||||||
busy = false;
|
busy = false;
|
||||||
run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs)
|
run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs)
|
||||||
run.worker_routing_id = "t" + std::to_string(run.worker_id);
|
run.worker_routing_name = "t" + std::to_string(run.worker_id);
|
||||||
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
|
run.worker_routing_id = "t" + std::string{reinterpret_cast<const char*>(&run.worker_id), sizeof(run.worker_id)};
|
||||||
|
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_name);
|
||||||
|
|
||||||
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)};
|
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)};
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,21 @@ inline bool recv_message_parts(zmq::socket_t& sock, std::vector<zmq::message_t>&
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Same as above, but using a fixed sized array; this is only used for internal jobs (e.g. control
|
||||||
|
// messages) where we know the message parts should never exceed a given size (this function does
|
||||||
|
// not bounds check except in debug builds). Returns the number of message parts received, or 0 on
|
||||||
|
// read error.
|
||||||
|
template <size_t N>
|
||||||
|
inline size_t recv_message_parts(zmq::socket_t& sock, std::array<zmq::message_t, N>& parts, const zmq::recv_flags flags = zmq::recv_flags::none) {
|
||||||
|
for (size_t count = 0; ; count++) {
|
||||||
|
assert(count < N);
|
||||||
|
if (!sock.recv(parts[count], flags))
|
||||||
|
return 0;
|
||||||
|
if (!parts[count].more())
|
||||||
|
return count + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
inline const char* peer_address(zmq::message_t& msg) {
|
inline const char* peer_address(zmq::message_t& msg) {
|
||||||
try { return msg.gets("Peer-Address"); } catch (...) {}
|
try { return msg.gets("Peer-Address"); } catch (...) {}
|
||||||
return "(unknown)";
|
return "(unknown)";
|
||||||
|
|
|
@ -45,6 +45,7 @@
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <future>
|
#include <future>
|
||||||
|
#include <variant>
|
||||||
#include "zmq.hpp"
|
#include "zmq.hpp"
|
||||||
#include "address.h"
|
#include "address.h"
|
||||||
#include <oxenc/bt_serialize.h>
|
#include <oxenc/bt_serialize.h>
|
||||||
|
@ -106,6 +107,7 @@ private:
|
||||||
explicit constexpr TaggedThreadID(int id) : _id{id} {}
|
explicit constexpr TaggedThreadID(int id) : _id{id} {}
|
||||||
friend class OxenMQ;
|
friend class OxenMQ;
|
||||||
template <typename R> friend class Batch;
|
template <typename R> friend class Batch;
|
||||||
|
friend class Job;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only
|
/// Opaque handler for a timer constructed by add_timer(...). Safe (and cheap) to copy. The only
|
||||||
|
@ -455,8 +457,9 @@ private:
|
||||||
/// Router socket to reach internal worker threads from proxy
|
/// Router socket to reach internal worker threads from proxy
|
||||||
zmq::socket_t workers_socket{context, zmq::socket_type::router};
|
zmq::socket_t workers_socket{context, zmq::socket_type::router};
|
||||||
|
|
||||||
/// indices of idle, active workers
|
/// indices of idle, active workers; note that this vector is usually oversized
|
||||||
std::vector<unsigned int> idle_workers;
|
std::vector<unsigned int> idle_workers;
|
||||||
|
size_t idle_worker_count = 0; // Actual # elements of idle_workers in use
|
||||||
|
|
||||||
/// Maximum number of general task workers, specified by set_general_threads()
|
/// Maximum number of general task workers, specified by set_general_threads()
|
||||||
int general_workers = std::max<int>(1, std::thread::hardware_concurrency());
|
int general_workers = std::max<int>(1, std::thread::hardware_concurrency());
|
||||||
|
@ -468,7 +471,7 @@ private:
|
||||||
int max_workers;
|
int max_workers;
|
||||||
|
|
||||||
/// Number of active workers
|
/// Number of active workers
|
||||||
int active_workers() const { return workers.size() - idle_workers.size(); }
|
int active_workers() const { return workers.size() - idle_worker_count; }
|
||||||
|
|
||||||
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
|
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
|
||||||
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
|
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
|
||||||
|
@ -483,7 +486,9 @@ private:
|
||||||
|
|
||||||
void proxy_conn_cleanup();
|
void proxy_conn_cleanup();
|
||||||
|
|
||||||
void proxy_worker_message(std::vector<zmq::message_t>& parts);
|
using control_message_array = std::array<zmq::message_t, 3>;
|
||||||
|
|
||||||
|
void proxy_worker_message(control_message_array& parts, size_t len);
|
||||||
|
|
||||||
void proxy_process_queue();
|
void proxy_process_queue();
|
||||||
|
|
||||||
|
@ -575,18 +580,17 @@ private:
|
||||||
/// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open).
|
/// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open).
|
||||||
void proxy_reply(oxenc::bt_dict_consumer data);
|
void proxy_reply(oxenc::bt_dict_consumer data);
|
||||||
|
|
||||||
/// Currently active batch/reply jobs; this is the container that owns the Batch instances
|
|
||||||
std::unordered_set<detail::Batch*> batches;
|
|
||||||
/// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the
|
/// Individual batch jobs waiting to run; .second is the 0-n batch number or -1 for the
|
||||||
/// completion job
|
/// completion job
|
||||||
using batch_job = std::pair<detail::Batch*, int>;
|
using batch_job = std::pair<detail::Batch*, int>;
|
||||||
std::queue<batch_job> batch_jobs, reply_jobs;
|
using batch_queue = std::deque<batch_job>;
|
||||||
|
batch_queue batch_jobs, reply_jobs;
|
||||||
int batch_jobs_active = 0;
|
int batch_jobs_active = 0;
|
||||||
int reply_jobs_active = 0;
|
int reply_jobs_active = 0;
|
||||||
int batch_jobs_reserved = -1;
|
int batch_jobs_reserved = -1;
|
||||||
int reply_jobs_reserved = -1;
|
int reply_jobs_reserved = -1;
|
||||||
/// Runs any queued batch jobs
|
/// Runs any queued batch jobs
|
||||||
void proxy_run_batch_jobs(std::queue<batch_job>& jobs, int reserved, int& active, bool reply);
|
void proxy_run_batch_jobs(batch_queue& jobs, int reserved, int& active, bool reply);
|
||||||
|
|
||||||
/// BATCH command. Called with a Batch<R> (see oxenmq/batch.h) object pointer for the proxy to
|
/// BATCH command. Called with a Batch<R> (see oxenmq/batch.h) object pointer for the proxy to
|
||||||
/// take over and queue batch jobs.
|
/// take over and queue batch jobs.
|
||||||
|
@ -608,7 +612,7 @@ private:
|
||||||
void process_zap_requests();
|
void process_zap_requests();
|
||||||
|
|
||||||
/// Handles a control message from some outer thread to the proxy
|
/// Handles a control message from some outer thread to the proxy
|
||||||
void proxy_control_message(std::vector<zmq::message_t>& parts);
|
void proxy_control_message(control_message_array& parts, size_t len);
|
||||||
|
|
||||||
/// Closing any idle connections that have outlived their idle time. Note that this only
|
/// Closing any idle connections that have outlived their idle time. Note that this only
|
||||||
/// affects outgoing connections; incomings connections are the responsibility of the other end.
|
/// affects outgoing connections; incomings connections are the responsibility of the other end.
|
||||||
|
@ -744,8 +748,9 @@ private:
|
||||||
|
|
||||||
// These belong to the proxy thread and must not be accessed by a worker:
|
// These belong to the proxy thread and must not be accessed by a worker:
|
||||||
std::thread worker_thread;
|
std::thread worker_thread;
|
||||||
size_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
|
uint32_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
|
||||||
std::string worker_routing_id; // "w123" where 123 == worker_id; "n123" for tagged threads.
|
std::string worker_routing_id; // "wXXXX" where XXXX is the raw bytes of worker_id, or tXXXX for tagged threads.
|
||||||
|
std::string worker_routing_name; // "w123" or "t123" -- human readable version of worker_routing_id
|
||||||
|
|
||||||
/// Loads the run info with an incoming command
|
/// Loads the run info with an incoming command
|
||||||
run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote,
|
run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote,
|
||||||
|
@ -769,7 +774,7 @@ private:
|
||||||
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
|
/// Workers that are reserved for tagged thread tasks (as created with add_tagged_thread). The
|
||||||
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
|
/// queue here is similar to worker_jobs, but contains only the tagged thread's jobs. The bool
|
||||||
/// is whether the worker is currently busy (true) or available (false).
|
/// is whether the worker is currently busy (true) or available (false).
|
||||||
std::vector<std::tuple<run_info, bool, std::queue<batch_job>>> tagged_workers;
|
std::vector<std::tuple<run_info, bool, batch_queue>> tagged_workers;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -271,14 +271,14 @@ void OxenMQ::proxy_reply(oxenc::bt_dict_consumer data) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
|
void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t len) {
|
||||||
// We throw an uncaught exception here because we only generate control messages internally in
|
// We throw an uncaught exception here because we only generate control messages internally in
|
||||||
// oxenmq code: if one of these condition fail it's a oxenmq bug.
|
// oxenmq code: if one of these condition fail it's a oxenmq bug.
|
||||||
if (parts.size() < 2)
|
if (len < 2)
|
||||||
throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message");
|
throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message");
|
||||||
auto route = view(parts[0]), cmd = view(parts[1]);
|
auto route = view(parts[0]), cmd = view(parts[1]);
|
||||||
OMQ_TRACE("control message: ", cmd);
|
OMQ_TRACE("control message: ", cmd);
|
||||||
if (parts.size() == 3) {
|
if (len == 3) {
|
||||||
OMQ_TRACE("...: ", parts[2]);
|
OMQ_TRACE("...: ", parts[2]);
|
||||||
auto data = view(parts[2]);
|
auto data = view(parts[2]);
|
||||||
if (cmd == "SEND") {
|
if (cmd == "SEND") {
|
||||||
|
@ -315,7 +315,7 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
|
||||||
bind.push_back(std::move(b));
|
bind.push_back(std::move(b));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (parts.size() == 2) {
|
} else if (len == 2) {
|
||||||
if (cmd == "START") {
|
if (cmd == "START") {
|
||||||
// Command send by the owning thread during startup; we send back a simple READY reply to
|
// Command send by the owning thread during startup; we send back a simple READY reply to
|
||||||
// let it know we are running.
|
// let it know we are running.
|
||||||
|
@ -325,9 +325,9 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
|
||||||
// close workers as they come back to READY status, and then close external
|
// close workers as they come back to READY status, and then close external
|
||||||
// connections once all workers are done.
|
// connections once all workers are done.
|
||||||
max_workers = 0;
|
max_workers = 0;
|
||||||
for (const auto &route : idle_workers)
|
for (size_t i = 0; i < idle_worker_count; i++)
|
||||||
route_control(workers_socket, workers[route].worker_routing_id, "QUIT");
|
route_control(workers_socket, workers[idle_workers[i]].worker_routing_id, "QUIT");
|
||||||
idle_workers.clear();
|
idle_worker_count = 0;
|
||||||
for (auto& [run, busy, queue] : tagged_workers)
|
for (auto& [run, busy, queue] : tagged_workers)
|
||||||
if (!busy)
|
if (!busy)
|
||||||
route_control(workers_socket, run.worker_routing_id, "QUIT");
|
route_control(workers_socket, run.worker_routing_id, "QUIT");
|
||||||
|
@ -335,7 +335,7 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " +
|
throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " +
|
||||||
std::string{cmd} + " (" + std::to_string(parts.size()) + ")");
|
std::string{cmd} + " (" + std::to_string(len) + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
|
bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
|
||||||
|
@ -404,6 +404,7 @@ void OxenMQ::proxy_loop_init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
workers.reserve(max_workers);
|
workers.reserve(max_workers);
|
||||||
|
idle_workers.resize(max_workers);
|
||||||
if (!workers.empty())
|
if (!workers.empty())
|
||||||
throw std::logic_error("Internal error: proxy thread started with active worker threads");
|
throw std::logic_error("Internal error: proxy thread started with active worker threads");
|
||||||
|
|
||||||
|
@ -482,7 +483,7 @@ void OxenMQ::proxy_loop_init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto&w : tagged_workers) {
|
for (auto&w : tagged_workers) {
|
||||||
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
|
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_name, " to finish startup");
|
||||||
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
|
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -497,6 +498,10 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
||||||
}
|
}
|
||||||
startup.set_value();
|
startup.set_value();
|
||||||
|
|
||||||
|
// Fixed array used for worker and control messages: these are never longer than 3 parts:
|
||||||
|
std::array<zmq::message_t, 3> control_parts;
|
||||||
|
|
||||||
|
// General vector for handling incoming messages:
|
||||||
std::vector<zmq::message_t> parts;
|
std::vector<zmq::message_t> parts;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -529,13 +534,13 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
||||||
|
|
||||||
OMQ_TRACE("processing control messages");
|
OMQ_TRACE("processing control messages");
|
||||||
// Retrieve any waiting incoming control messages
|
// Retrieve any waiting incoming control messages
|
||||||
for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) {
|
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
|
||||||
proxy_control_message(parts);
|
proxy_control_message(control_parts, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
OMQ_TRACE("processing worker messages");
|
OMQ_TRACE("processing worker messages");
|
||||||
for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) {
|
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
|
||||||
proxy_worker_message(parts);
|
proxy_worker_message(control_parts, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
OMQ_TRACE("processing timers");
|
OMQ_TRACE("processing timers");
|
||||||
|
@ -731,7 +736,7 @@ void OxenMQ::proxy_process_queue() {
|
||||||
if (!busy && !queue.empty()) {
|
if (!busy && !queue.empty()) {
|
||||||
busy = true;
|
busy = true;
|
||||||
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
|
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
|
||||||
queue.pop();
|
queue.pop_front();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,10 +48,11 @@ bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector<zmq::message_
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
|
void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
|
||||||
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
|
std::string routing_id = (tagged ? "t" : "w") +
|
||||||
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
|
std::string(reinterpret_cast<const char*>(&index), sizeof(index)); // for routing
|
||||||
|
std::string worker_id{tagged ? *tagged : "w" + std::to_string(index)}; // for debug
|
||||||
|
|
||||||
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id);
|
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + worker_id);
|
||||||
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
|
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
|
||||||
if (thread_name.size() > 15) thread_name.resize(15);
|
if (thread_name.size() > 15) thread_name.resize(15);
|
||||||
pthread_setname_np(pthread_self(), thread_name.c_str());
|
pthread_setname_np(pthread_self(), thread_name.c_str());
|
||||||
|
@ -161,39 +162,37 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
||||||
|
|
||||||
|
|
||||||
OxenMQ::run_info& OxenMQ::get_idle_worker() {
|
OxenMQ::run_info& OxenMQ::get_idle_worker() {
|
||||||
if (idle_workers.empty()) {
|
if (idle_worker_count == 0) {
|
||||||
size_t id = workers.size();
|
uint32_t id = workers.size();
|
||||||
assert(workers.capacity() > id);
|
|
||||||
workers.emplace_back();
|
workers.emplace_back();
|
||||||
auto& r = workers.back();
|
auto& r = workers.back();
|
||||||
r.worker_id = id;
|
r.worker_id = id;
|
||||||
r.worker_routing_id = "w" + std::to_string(id);
|
r.worker_routing_id = "w" + std::string(reinterpret_cast<const char*>(&id), sizeof(id));
|
||||||
|
r.worker_routing_name = "w" + std::to_string(id);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
size_t id = idle_workers.back();
|
size_t id = idle_workers[--idle_worker_count];
|
||||||
idle_workers.pop_back();
|
|
||||||
return workers[id];
|
return workers[id];
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
|
void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t len) {
|
||||||
// Process messages sent by workers
|
// Process messages sent by workers
|
||||||
if (parts.size() != 2) {
|
if (len != 2) {
|
||||||
OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message");
|
OMQ_LOG(error, "Received send invalid ", len, "-part message");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
auto route = view(parts[0]), cmd = view(parts[1]);
|
auto route = view(parts[0]), cmd = view(parts[1]);
|
||||||
OMQ_TRACE("worker message from ", route);
|
if (route.size() != 5 || (route[0] != 'w' && route[0] != 't')) {
|
||||||
assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9');
|
OMQ_LOG(error, "Received malformed worker id in worker message; unable to process worker command");
|
||||||
|
return;
|
||||||
|
}
|
||||||
bool tagged_worker = route[0] == 't';
|
bool tagged_worker = route[0] == 't';
|
||||||
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t")
|
uint32_t worker_id;
|
||||||
unsigned int worker_id = oxenc::detail::extract_unsigned(worker_id_str);
|
std::memcpy(&worker_id, route.data() + 1, 4);
|
||||||
if (!worker_id_str.empty() /* didn't consume everything */ ||
|
if (tagged_worker
|
||||||
(tagged_worker
|
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
|
||||||
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
|
: worker_id >= workers.size()) { // regular worker ids are indexed from 0 to N-1
|
||||||
: worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1
|
OMQ_LOG(error, "Received invalid worker id w" + std::to_string(worker_id) + " in worker message; unable to process worker command");
|
||||||
)
|
|
||||||
) {
|
|
||||||
OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,11 +232,11 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
|
||||||
} else {
|
} else {
|
||||||
auto& jobs =
|
auto& jobs =
|
||||||
thread > 0
|
thread > 0
|
||||||
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1]) // run in tagged thread
|
? std::get<batch_queue>(tagged_workers[thread - 1]) // run in tagged thread
|
||||||
: run.is_reply_job
|
: run.is_reply_job
|
||||||
? reply_jobs
|
? reply_jobs
|
||||||
: batch_jobs;
|
: batch_jobs;
|
||||||
jobs.emplace(batch, -1);
|
jobs.emplace_back(batch, -1);
|
||||||
}
|
}
|
||||||
} else if (state == detail::BatchState::done) {
|
} else if (state == detail::BatchState::done) {
|
||||||
// No completion job
|
// No completion job
|
||||||
|
@ -247,9 +246,7 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (clear_job) {
|
if (clear_job) {
|
||||||
batches.erase(batch);
|
|
||||||
delete batch;
|
delete batch;
|
||||||
run.to_run = static_cast<detail::Batch*>(nullptr);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(run.cat->active_threads > 0);
|
assert(run.cat->active_threads > 0);
|
||||||
|
@ -259,7 +256,7 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
|
||||||
OMQ_TRACE("Telling worker ", route, " to quit");
|
OMQ_TRACE("Telling worker ", route, " to quit");
|
||||||
route_control(workers_socket, route, "QUIT");
|
route_control(workers_socket, route, "QUIT");
|
||||||
} else if (!tagged_worker) {
|
} else if (!tagged_worker) {
|
||||||
idle_workers.push_back(worker_id);
|
idle_workers[idle_worker_count++] = worker_id;
|
||||||
}
|
}
|
||||||
} else if (cmd == "QUITTING"sv) {
|
} else if (cmd == "QUITTING"sv) {
|
||||||
run.worker_thread.join();
|
run.worker_thread.join();
|
||||||
|
@ -381,7 +378,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
|
||||||
peer->activity(); // outgoing connection activity, pump the activity timer
|
peer->activity(); // outgoing connection activity, pump the activity timer
|
||||||
|
|
||||||
OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
|
OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
|
||||||
" to worker ", run.worker_routing_id);
|
" to worker ", run.worker_routing_name);
|
||||||
|
|
||||||
proxy_run_worker(run);
|
proxy_run_worker(run);
|
||||||
category.active_threads++;
|
category.active_threads++;
|
||||||
|
@ -412,7 +409,7 @@ void OxenMQ::proxy_inject_task(injected_task task) {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& run = get_idle_worker();
|
auto& run = get_idle_worker();
|
||||||
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id);
|
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_name);
|
||||||
run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback));
|
run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback));
|
||||||
|
|
||||||
proxy_run_worker(run);
|
proxy_run_worker(run);
|
||||||
|
|
Loading…
Reference in New Issue