mirror of
https://github.com/oxen-io/oxen-mq.git
synced 2023-12-13 21:00:31 +01:00
commit
e7487fd0c8
8 changed files with 107 additions and 109 deletions
|
@ -31,9 +31,9 @@ local debian_pipeline(name, image, arch='amd64', deps='g++ libsodium-dev libzmq3
|
||||||
cmake_extra='-DCMAKE_C_COMPILER=gcc-8 -DCMAKE_CXX_COMPILER=g++-8'),
|
cmake_extra='-DCMAKE_C_COMPILER=gcc-8 -DCMAKE_CXX_COMPILER=g++-8'),
|
||||||
debian_pipeline("Debian sid (amd64)", "debian:sid"),
|
debian_pipeline("Debian sid (amd64)", "debian:sid"),
|
||||||
debian_pipeline("Debian sid/Debug (amd64)", "debian:sid", build_type='Debug'),
|
debian_pipeline("Debian sid/Debug (amd64)", "debian:sid", build_type='Debug'),
|
||||||
debian_pipeline("Debian sid/clang-10 (amd64)", "debian:sid", deps='clang-10 lld-10 libsodium-dev libzmq3-dev',
|
debian_pipeline("Debian sid/clang-11 (amd64)", "debian:sid", deps='clang-11 lld-11 libsodium-dev libzmq3-dev',
|
||||||
cmake_extra='-DCMAKE_C_COMPILER=clang-10 -DCMAKE_CXX_COMPILER=clang++-10 ' + std.join(' ', [
|
cmake_extra='-DCMAKE_C_COMPILER=clang-11 -DCMAKE_CXX_COMPILER=clang++-11 ' + std.join(' ', [
|
||||||
'-DCMAKE_'+type+'_LINKER_FLAGS=-fuse-ld=lld-10' for type in ['EXE','MODULE','SHARED','STATIC']])),
|
'-DCMAKE_'+type+'_LINKER_FLAGS=-fuse-ld=lld-11' for type in ['EXE','MODULE','SHARED','STATIC']])),
|
||||||
debian_pipeline("Debian buster (amd64)", "debian:buster"),
|
debian_pipeline("Debian buster (amd64)", "debian:buster"),
|
||||||
debian_pipeline("Debian buster (i386)", "i386/debian:buster"),
|
debian_pipeline("Debian buster (i386)", "i386/debian:buster"),
|
||||||
debian_pipeline("Ubuntu bionic (ARM64)", "ubuntu:bionic", arch="arm64", deps='libsodium-dev g++-8',
|
debian_pipeline("Ubuntu bionic (ARM64)", "ubuntu:bionic", arch="arm64", deps='libsodium-dev g++-8',
|
||||||
|
|
|
@ -9,16 +9,29 @@ include(GNUInstallDirs)
|
||||||
|
|
||||||
set(LOKIMQ_VERSION_MAJOR 1)
|
set(LOKIMQ_VERSION_MAJOR 1)
|
||||||
set(LOKIMQ_VERSION_MINOR 2)
|
set(LOKIMQ_VERSION_MINOR 2)
|
||||||
set(LOKIMQ_VERSION_PATCH 1)
|
set(LOKIMQ_VERSION_PATCH 2)
|
||||||
set(LOKIMQ_VERSION "${LOKIMQ_VERSION_MAJOR}.${LOKIMQ_VERSION_MINOR}.${LOKIMQ_VERSION_PATCH}")
|
set(LOKIMQ_VERSION "${LOKIMQ_VERSION_MAJOR}.${LOKIMQ_VERSION_MINOR}.${LOKIMQ_VERSION_PATCH}")
|
||||||
message(STATUS "lokimq v${LOKIMQ_VERSION}")
|
message(STATUS "lokimq v${LOKIMQ_VERSION}")
|
||||||
|
|
||||||
set(LOKIMQ_LIBVERSION 0)
|
set(LOKIMQ_LIBVERSION 0)
|
||||||
|
|
||||||
|
|
||||||
option(BUILD_SHARED_LIBS "Build shared libraries instead of static ones" ON)
|
if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
|
||||||
|
set(lokimq_IS_TOPLEVEL_PROJECT TRUE)
|
||||||
|
else()
|
||||||
|
set(lokimq_IS_TOPLEVEL_PROJECT FALSE)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
|
||||||
|
option(BUILD_SHARED_LIBS "Build shared libraries instead of static ones" ON)
|
||||||
|
set(lokimq_INSTALL_DEFAULT OFF)
|
||||||
|
if(BUILD_SHARED_LIBS OR lokimq_IS_TOPLEVEL_PROJECT)
|
||||||
|
set(lokimq_INSTALL_DEFAULT ON)
|
||||||
|
endif()
|
||||||
|
option(LOKIMQ_BUILD_TESTS "Building and perform lokimq tests" ${lokimq_IS_TOPLEVEL_PROJECT})
|
||||||
|
option(LOKIMQ_INSTALL "Add lokimq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${lokimq_INSTALL_DEFAULT})
|
||||||
|
option(LOKIMQ_INSTALL_CPPZMQ "Install cppzmq header with lokimq/ headers (requires LOKIMQ_INSTALL)" ON)
|
||||||
|
|
||||||
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
|
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
|
||||||
|
|
||||||
configure_file(lokimq/version.h.in lokimq/version.h @ONLY)
|
configure_file(lokimq/version.h.in lokimq/version.h @ONLY)
|
||||||
|
@ -95,7 +108,6 @@ set_target_properties(lokimq PROPERTIES
|
||||||
function(link_dep_libs target linktype libdirs)
|
function(link_dep_libs target linktype libdirs)
|
||||||
foreach(lib ${ARGN})
|
foreach(lib ${ARGN})
|
||||||
find_library(link_lib-${lib} NAMES ${lib} PATHS ${libdirs})
|
find_library(link_lib-${lib} NAMES ${lib} PATHS ${libdirs})
|
||||||
message(STATUS "FIND ${lib} FOUND ${link_lib-${lib}}")
|
|
||||||
if(link_lib-${lib})
|
if(link_lib-${lib})
|
||||||
target_link_libraries(${target} ${linktype} ${link_lib-${lib}})
|
target_link_libraries(${target} ${linktype} ${link_lib-${lib}})
|
||||||
endif()
|
endif()
|
||||||
|
@ -110,6 +122,7 @@ if(TARGET sodium)
|
||||||
target_link_libraries(libzmq_vendor INTERFACE sodium)
|
target_link_libraries(libzmq_vendor INTERFACE sodium)
|
||||||
endif()
|
endif()
|
||||||
else()
|
else()
|
||||||
|
include(FindPkgConfig)
|
||||||
pkg_check_modules(sodium REQUIRED libsodium IMPORTED_TARGET)
|
pkg_check_modules(sodium REQUIRED libsodium IMPORTED_TARGET)
|
||||||
|
|
||||||
if(BUILD_SHARED_LIBS)
|
if(BUILD_SHARED_LIBS)
|
||||||
|
@ -134,52 +147,46 @@ export(
|
||||||
NAMESPACE lokimq::
|
NAMESPACE lokimq::
|
||||||
FILE lokimqTargets.cmake
|
FILE lokimqTargets.cmake
|
||||||
)
|
)
|
||||||
install(
|
|
||||||
TARGETS lokimq
|
|
||||||
EXPORT lokimqConfig
|
|
||||||
DESTINATION ${CMAKE_INSTALL_LIBDIR}
|
|
||||||
)
|
|
||||||
|
|
||||||
install(
|
if(LOKIMQ_INSTALL)
|
||||||
FILES lokimq/address.h
|
install(
|
||||||
lokimq/auth.h
|
TARGETS lokimq
|
||||||
lokimq/base32z.h
|
EXPORT lokimqConfig
|
||||||
lokimq/base64.h
|
DESTINATION ${CMAKE_INSTALL_LIBDIR}
|
||||||
lokimq/batch.h
|
)
|
||||||
lokimq/bt_serialize.h
|
|
||||||
lokimq/bt_value.h
|
|
||||||
lokimq/connections.h
|
|
||||||
lokimq/hex.h
|
|
||||||
lokimq/lokimq.h
|
|
||||||
lokimq/message.h
|
|
||||||
lokimq/string_view.h
|
|
||||||
lokimq/variant.h
|
|
||||||
${CMAKE_CURRENT_BINARY_DIR}/lokimq/version.h
|
|
||||||
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
|
|
||||||
)
|
|
||||||
|
|
||||||
option(LOKIMQ_INSTALL_CPPZMQ "Install cppzmq header with lokimq/ headers" ON)
|
install(
|
||||||
if(LOKIMQ_INSTALL_CPPZMQ)
|
FILES lokimq/address.h
|
||||||
install(
|
lokimq/auth.h
|
||||||
FILES cppzmq/zmq.hpp
|
lokimq/base32z.h
|
||||||
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
|
lokimq/base64.h
|
||||||
)
|
lokimq/batch.h
|
||||||
|
lokimq/bt_serialize.h
|
||||||
|
lokimq/bt_value.h
|
||||||
|
lokimq/connections.h
|
||||||
|
lokimq/hex.h
|
||||||
|
lokimq/lokimq.h
|
||||||
|
lokimq/message.h
|
||||||
|
lokimq/string_view.h
|
||||||
|
lokimq/variant.h
|
||||||
|
${CMAKE_CURRENT_BINARY_DIR}/lokimq/version.h
|
||||||
|
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
|
||||||
|
)
|
||||||
|
|
||||||
|
if(LOKIMQ_INSTALL_CPPZMQ)
|
||||||
|
install(
|
||||||
|
FILES cppzmq/zmq.hpp
|
||||||
|
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
|
||||||
|
)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
|
||||||
|
install(
|
||||||
|
FILES ${CMAKE_CURRENT_BINARY_DIR}/liblokimq.pc
|
||||||
|
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig
|
||||||
|
)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
|
||||||
install(
|
|
||||||
FILES ${CMAKE_CURRENT_BINARY_DIR}/liblokimq.pc
|
|
||||||
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig
|
|
||||||
)
|
|
||||||
|
|
||||||
if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
|
|
||||||
set(lokimq_IS_TOPLEVEL_PROJECT TRUE)
|
|
||||||
else()
|
|
||||||
set(lokimq_IS_TOPLEVEL_PROJECT FALSE)
|
|
||||||
endif()
|
|
||||||
|
|
||||||
option(LOKIMQ_BUILD_TESTS "Building and perform lokimq tests" ${lokimq_IS_TOPLEVEL_PROJECT})
|
|
||||||
if(LOKIMQ_BUILD_TESTS)
|
if(LOKIMQ_BUILD_TESTS)
|
||||||
add_subdirectory(tests)
|
add_subdirectory(tests)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
|
2
cppzmq
2
cppzmq
|
@ -1 +1 @@
|
||||||
Subproject commit 8d5c9a88988dcbebb72939ca0939d432230ffde1
|
Subproject commit 76bf169fd67b8e99c1b0e6490029d9cd5ef97666
|
|
@ -36,14 +36,14 @@ void LokiMQ::rebuild_pollitems() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void LokiMQ::setup_external_socket(zmq::socket_t& socket) {
|
void LokiMQ::setup_external_socket(zmq::socket_t& socket) {
|
||||||
socket.setsockopt(ZMQ_RECONNECT_IVL, (int) RECONNECT_INTERVAL.count());
|
socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count());
|
||||||
socket.setsockopt(ZMQ_RECONNECT_IVL_MAX, (int) RECONNECT_INTERVAL_MAX.count());
|
socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count());
|
||||||
socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count());
|
socket.set(zmq::sockopt::handshake_ivl, (int) HANDSHAKE_TIME.count());
|
||||||
socket.setsockopt<int64_t>(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE);
|
socket.set(zmq::sockopt::maxmsgsize, MAX_MSG_SIZE);
|
||||||
if (CONN_HEARTBEAT > 0s) {
|
if (CONN_HEARTBEAT > 0s) {
|
||||||
socket.setsockopt(ZMQ_HEARTBEAT_IVL, (int) CONN_HEARTBEAT.count());
|
socket.set(zmq::sockopt::heartbeat_ivl, (int) CONN_HEARTBEAT.count());
|
||||||
if (CONN_HEARTBEAT_TIMEOUT > 0s)
|
if (CONN_HEARTBEAT_TIMEOUT > 0s)
|
||||||
socket.setsockopt(ZMQ_HEARTBEAT_TIMEOUT, (int) CONN_HEARTBEAT_TIMEOUT.count());
|
socket.set(zmq::sockopt::heartbeat_timeout, (int) CONN_HEARTBEAT_TIMEOUT.count());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,9 +52,9 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
|
||||||
setup_external_socket(socket);
|
setup_external_socket(socket);
|
||||||
|
|
||||||
if (!remote_pubkey.empty()) {
|
if (!remote_pubkey.empty()) {
|
||||||
socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size());
|
socket.set(zmq::sockopt::curve_serverkey, remote_pubkey);
|
||||||
socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size());
|
socket.set(zmq::sockopt::curve_publickey, pubkey);
|
||||||
socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size());
|
socket.set(zmq::sockopt::curve_secretkey, privkey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PUBKEY_BASED_ROUTING_ID) {
|
if (PUBKEY_BASED_ROUTING_ID) {
|
||||||
|
@ -62,7 +62,7 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
|
||||||
routing_id.reserve(33);
|
routing_id.reserve(33);
|
||||||
routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0)
|
routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0)
|
||||||
routing_id.append(pubkey.begin(), pubkey.end());
|
routing_id.append(pubkey.begin(), pubkey.end());
|
||||||
socket.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size());
|
socket.set(zmq::sockopt::routing_id, routing_id);
|
||||||
}
|
}
|
||||||
// else let ZMQ pick a random one
|
// else let ZMQ pick a random one
|
||||||
}
|
}
|
||||||
|
@ -227,7 +227,7 @@ void update_connection_indices(Container& c, size_t index, AccessIndex get_index
|
||||||
/// which can invalidate iterators on the various connection containers - if you don't want that,
|
/// which can invalidate iterators on the various connection containers - if you don't want that,
|
||||||
/// delete it first so that the container won't contain the element being deleted.
|
/// delete it first so that the container won't contain the element being deleted.
|
||||||
void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) {
|
void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) {
|
||||||
connections[index].setsockopt<int>(ZMQ_LINGER, linger > 0ms ? linger.count() : 0);
|
connections[index].set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0);
|
||||||
pollitems_stale = true;
|
pollitems_stale = true;
|
||||||
connections.erase(connections.begin() + index);
|
connections.erase(connections.begin() + index);
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
#include "lokimq.h"
|
#include "lokimq.h"
|
||||||
#include "lokimq-internal.h"
|
#include "lokimq-internal.h"
|
||||||
|
#include "zmq.hpp"
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <ostream>
|
#include <ostream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include <sodium/core.h>
|
#include <sodium/core.h>
|
||||||
|
@ -74,8 +76,8 @@ std::pair<std::string, AuthLevel> extract_metadata(zmq::message_t& msg) {
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
int LokiMQ::set_zmq_context_option(int option, int value) {
|
void LokiMQ::set_zmq_context_option(zmq::ctxopt option, int value) {
|
||||||
return context.setctxopt(option, value);
|
context.set(option, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void LokiMQ::log_level(LogLevel level) {
|
void LokiMQ::log_level(LogLevel level) {
|
||||||
|
@ -161,33 +163,28 @@ std::atomic<int> next_id{1};
|
||||||
zmq::socket_t& LokiMQ::get_control_socket() {
|
zmq::socket_t& LokiMQ::get_control_socket() {
|
||||||
assert(proxy_thread.joinable());
|
assert(proxy_thread.joinable());
|
||||||
|
|
||||||
// Maps the LokiMQ unique ID to a local thread command socket.
|
|
||||||
static thread_local std::map<int, std::shared_ptr<zmq::socket_t>> control_sockets;
|
|
||||||
static thread_local std::pair<int, std::shared_ptr<zmq::socket_t>> last{-1, nullptr};
|
|
||||||
|
|
||||||
// Optimize by caching the last value; LokiMQ is often a singleton and in that case we're
|
// Optimize by caching the last value; LokiMQ is often a singleton and in that case we're
|
||||||
// going to *always* hit this optimization. Even if it isn't, we're probably likely to need the
|
// going to *always* hit this optimization. Even if it isn't, we're probably likely to need the
|
||||||
// same control socket from the same thread multiple times sequentially so this may still help.
|
// same control socket from the same thread multiple times sequentially so this may still help.
|
||||||
if (object_id == last.first)
|
static thread_local int last_id = -1;
|
||||||
return *last.second;
|
static thread_local zmq::socket_t* last_socket = nullptr;
|
||||||
|
if (object_id == last_id)
|
||||||
auto it = control_sockets.find(object_id);
|
return *last_socket;
|
||||||
if (it != control_sockets.end()) {
|
|
||||||
last = *it;
|
|
||||||
return *last.second;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::lock_guard lock{control_sockets_mutex};
|
std::lock_guard lock{control_sockets_mutex};
|
||||||
|
|
||||||
if (proxy_shutting_down)
|
if (proxy_shutting_down)
|
||||||
throw std::runtime_error("Unable to obtain LokiMQ control socket: proxy thread is shutting down");
|
throw std::runtime_error("Unable to obtain LokiMQ control socket: proxy thread is shutting down");
|
||||||
auto control = std::make_shared<zmq::socket_t>(context, zmq::socket_type::dealer);
|
|
||||||
control->setsockopt<int>(ZMQ_LINGER, 0);
|
auto& socket = control_sockets[std::this_thread::get_id()];
|
||||||
control->connect(SN_ADDR_COMMAND);
|
if (!socket) {
|
||||||
thread_control_sockets.push_back(control);
|
socket = std::make_unique<zmq::socket_t>(context, zmq::socket_type::dealer);
|
||||||
control_sockets.emplace(object_id, control);
|
socket->set(zmq::sockopt::linger, 0);
|
||||||
last.first = object_id;
|
socket->connect(SN_ADDR_COMMAND);
|
||||||
last.second = std::move(control);
|
}
|
||||||
return *last.second;
|
last_id = object_id;
|
||||||
|
last_socket = socket.get();
|
||||||
|
return *last_socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -243,9 +240,9 @@ void LokiMQ::start() {
|
||||||
|
|
||||||
LMQ_LOG(info, "Initializing LokiMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey));
|
LMQ_LOG(info, "Initializing LokiMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey));
|
||||||
|
|
||||||
int zmq_socket_limit = context.getctxopt(ZMQ_SOCKET_LIMIT);
|
int zmq_socket_limit = context.get(zmq::ctxopt::socket_limit);
|
||||||
if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit)
|
if (MAX_SOCKETS > 1 && MAX_SOCKETS <= zmq_socket_limit)
|
||||||
context.setctxopt(ZMQ_MAX_SOCKETS, MAX_SOCKETS);
|
context.set(zmq::ctxopt::max_sockets, MAX_SOCKETS);
|
||||||
else
|
else
|
||||||
LMQ_LOG(error, "Not applying LokiMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]");
|
LMQ_LOG(error, "Not applying LokiMQ::MAX_SOCKETS setting: ", MAX_SOCKETS, " must be in [1, ", zmq_socket_limit, "]");
|
||||||
|
|
||||||
|
@ -400,7 +397,7 @@ LokiMQ::~LokiMQ() {
|
||||||
// proxy thread starts (and we're getting destructed here without a proxy thread). So
|
// proxy thread starts (and we're getting destructed here without a proxy thread). So
|
||||||
// we need to start listening on it here in the destructor so that we establish a
|
// we need to start listening on it here in the destructor so that we establish a
|
||||||
// connection and send the QUITs to the tagged worker threads.
|
// connection and send the QUITs to the tagged worker threads.
|
||||||
workers_socket.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);
|
workers_socket.set(zmq::sockopt::router_mandatory, true);
|
||||||
workers_socket.bind(SN_ADDR_WORKERS);
|
workers_socket.bind(SN_ADDR_WORKERS);
|
||||||
for (auto& [run, busy, queue] : tagged_workers) {
|
for (auto& [run, busy, queue] : tagged_workers) {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -131,18 +131,15 @@ private:
|
||||||
|
|
||||||
/// We have one seldom-used mutex here: it is generally locked just once per thread (the first
|
/// We have one seldom-used mutex here: it is generally locked just once per thread (the first
|
||||||
/// time the thread calls get_control_socket()) and once more by the proxy thread when it shuts
|
/// time the thread calls get_control_socket()) and once more by the proxy thread when it shuts
|
||||||
/// down, and so will not be a contention point.
|
/// down.
|
||||||
std::mutex control_sockets_mutex;
|
std::mutex control_sockets_mutex;
|
||||||
|
|
||||||
/// Called to obtain a "command" socket that attaches to `control` to send commands to the
|
/// Called to obtain a "command" socket that attaches to `control` to send commands to the
|
||||||
/// proxy thread from other threads. This socket is unique per thread and LokiMQ instance.
|
/// proxy thread from other threads. This socket is unique per thread and LokiMQ instance.
|
||||||
zmq::socket_t& get_control_socket();
|
zmq::socket_t& get_control_socket();
|
||||||
|
|
||||||
/// Stores all of the sockets created in different threads via `get_control_socket`. This is
|
/// Per-thread control sockets used by lokimq threads to talk to this object's proxy thread.
|
||||||
/// only used during destruction to close all of those open sockets, and is protected by an
|
std::unordered_map<std::thread::id, std::unique_ptr<zmq::socket_t>> control_sockets;
|
||||||
/// internal mutex which is only locked by new threads getting a control socket and the
|
|
||||||
/// destructor.
|
|
||||||
std::vector<std::shared_ptr<zmq::socket_t>> thread_control_sockets;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
@ -257,7 +254,7 @@ public:
|
||||||
std::chrono::milliseconds CONN_HEARTBEAT_TIMEOUT = 30s;
|
std::chrono::milliseconds CONN_HEARTBEAT_TIMEOUT = 30s;
|
||||||
|
|
||||||
/// Allows you to set options on the internal zmq context object. For advanced use only.
|
/// Allows you to set options on the internal zmq context object. For advanced use only.
|
||||||
int set_zmq_context_option(int option, int value);
|
void set_zmq_context_option(zmq::ctxopt option, int value);
|
||||||
|
|
||||||
/** The umask to apply when constructing sockets (which affects any new ipc:// listening sockets
|
/** The umask to apply when constructing sockets (which affects any new ipc:// listening sockets
|
||||||
* that get created). Does nothing if set to -1 (the default), and does nothing on Windows.
|
* that get created). Does nothing if set to -1 (the default), and does nothing on Windows.
|
||||||
|
|
|
@ -25,18 +25,16 @@ void LokiMQ::proxy_quit() {
|
||||||
assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); }));
|
assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); }));
|
||||||
assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); }));
|
assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); }));
|
||||||
|
|
||||||
command.setsockopt<int>(ZMQ_LINGER, 0);
|
command.set(zmq::sockopt::linger, 0);
|
||||||
command.close();
|
command.close();
|
||||||
{
|
{
|
||||||
std::lock_guard lock{control_sockets_mutex};
|
std::lock_guard lock{control_sockets_mutex};
|
||||||
for (auto &control : thread_control_sockets)
|
|
||||||
control->close();
|
|
||||||
proxy_shutting_down = true; // To prevent threads from opening new control sockets
|
proxy_shutting_down = true; // To prevent threads from opening new control sockets
|
||||||
}
|
}
|
||||||
workers_socket.close();
|
workers_socket.close();
|
||||||
int linger = std::chrono::milliseconds{CLOSE_LINGER}.count();
|
int linger = std::chrono::milliseconds{CLOSE_LINGER}.count();
|
||||||
for (auto& s : connections)
|
for (auto& s : connections)
|
||||||
s.setsockopt(ZMQ_LINGER, linger);
|
s.set(zmq::sockopt::linger, linger);
|
||||||
connections.clear();
|
connections.clear();
|
||||||
peers.clear();
|
peers.clear();
|
||||||
|
|
||||||
|
@ -322,10 +320,10 @@ void LokiMQ::proxy_loop() {
|
||||||
pthread_setname_np("lmq-proxy");
|
pthread_setname_np("lmq-proxy");
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
zap_auth.setsockopt<int>(ZMQ_LINGER, 0);
|
zap_auth.set(zmq::sockopt::linger, 0);
|
||||||
zap_auth.bind(ZMQ_ADDR_ZAP);
|
zap_auth.bind(ZMQ_ADDR_ZAP);
|
||||||
|
|
||||||
workers_socket.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);
|
workers_socket.set(zmq::sockopt::router_mandatory, true);
|
||||||
workers_socket.bind(SN_ADDR_WORKERS);
|
workers_socket.bind(SN_ADDR_WORKERS);
|
||||||
|
|
||||||
assert(general_workers > 0);
|
assert(general_workers > 0);
|
||||||
|
@ -362,16 +360,15 @@ void LokiMQ::proxy_loop() {
|
||||||
auto& b = bind[i].second;
|
auto& b = bind[i].second;
|
||||||
zmq::socket_t listener{context, zmq::socket_type::router};
|
zmq::socket_t listener{context, zmq::socket_type::router};
|
||||||
|
|
||||||
std::string auth_domain = bt_serialize(i);
|
|
||||||
setup_external_socket(listener);
|
setup_external_socket(listener);
|
||||||
listener.setsockopt(ZMQ_ZAP_DOMAIN, auth_domain.c_str(), auth_domain.size());
|
listener.set(zmq::sockopt::zap_domain, bt_serialize(i));
|
||||||
if (b.curve) {
|
if (b.curve) {
|
||||||
listener.setsockopt<int>(ZMQ_CURVE_SERVER, 1);
|
listener.set(zmq::sockopt::curve_server, true);
|
||||||
listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size());
|
listener.set(zmq::sockopt::curve_publickey, pubkey);
|
||||||
listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size());
|
listener.set(zmq::sockopt::curve_secretkey, privkey);
|
||||||
}
|
}
|
||||||
listener.setsockopt<int>(ZMQ_ROUTER_HANDOVER, 1);
|
listener.set(zmq::sockopt::router_handover, true);
|
||||||
listener.setsockopt<int>(ZMQ_ROUTER_MANDATORY, 1);
|
listener.set(zmq::sockopt::router_mandatory, true);
|
||||||
|
|
||||||
listener.bind(bind[i].first);
|
listener.bind(bind[i].first);
|
||||||
LMQ_LOG(info, "LokiMQ listening on ", bind[i].first);
|
LMQ_LOG(info, "LokiMQ listening on ", bind[i].first);
|
||||||
|
@ -552,7 +549,7 @@ static bool is_error_response(std::string_view cmd) {
|
||||||
// reason)
|
// reason)
|
||||||
bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>& parts) {
|
bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>& parts) {
|
||||||
// Doubling as a bool and an offset:
|
// Doubling as a bool and an offset:
|
||||||
size_t incoming = connections[conn_index].getsockopt<int>(ZMQ_TYPE) == ZMQ_ROUTER;
|
size_t incoming = connections[conn_index].get(zmq::sockopt::type) == ZMQ_ROUTER;
|
||||||
|
|
||||||
std::string_view route, cmd;
|
std::string_view route, cmd;
|
||||||
if (parts.size() < 1 + incoming) {
|
if (parts.size() < 1 + incoming) {
|
||||||
|
|
|
@ -35,7 +35,7 @@ bool worker_wait_for(LokiMQ& lmq, zmq::socket_t& sock, std::vector<zmq::message_
|
||||||
} else if (command == "QUIT"sv) {
|
} else if (command == "QUIT"sv) {
|
||||||
lmq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down");
|
lmq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down");
|
||||||
detail::send_control(sock, "QUITTING");
|
detail::send_control(sock, "QUITTING");
|
||||||
sock.setsockopt<int>(ZMQ_LINGER, 1000);
|
sock.set(zmq::sockopt::linger, 1000);
|
||||||
sock.close();
|
sock.close();
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -61,7 +61,7 @@ void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
zmq::socket_t sock{context, zmq::socket_type::dealer};
|
zmq::socket_t sock{context, zmq::socket_type::dealer};
|
||||||
sock.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size());
|
sock.set(zmq::sockopt::routing_id, routing_id);
|
||||||
LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
|
LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
|
||||||
sock.connect(SN_ADDR_WORKERS);
|
sock.connect(SN_ADDR_WORKERS);
|
||||||
if (tagged)
|
if (tagged)
|
||||||
|
@ -276,7 +276,7 @@ void LokiMQ::proxy_run_worker(run_info& run) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& parts) {
|
void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& parts) {
|
||||||
bool outgoing = connections[conn_index].getsockopt<int>(ZMQ_TYPE) == ZMQ_DEALER;
|
bool outgoing = connections[conn_index].get(zmq::sockopt::type) == ZMQ_DEALER;
|
||||||
|
|
||||||
peer_info tmp_peer;
|
peer_info tmp_peer;
|
||||||
tmp_peer.conn_index = conn_index;
|
tmp_peer.conn_index = conn_index;
|
||||||
|
|
Loading…
Reference in a new issue