mirror of https://github.com/oxen-io/oxen-mq.git
Compare commits
9 Commits
ae86ebd93c
...
faccac8e84
Author | SHA1 | Date |
---|---|---|
Jason Rhinelander | faccac8e84 | |
Jason Rhinelander | a27961d787 | |
Jason Rhinelander | 5878473f67 | |
Jason Rhinelander | 68b3420bad | |
Jason Rhinelander | dc7fb35493 | |
Jason Rhinelander | caadd35052 | |
Jason Rhinelander | fd58ab9cac | |
Jason Rhinelander | 8f97add30f | |
Jason Rhinelander | a5f1574af7 |
|
@ -81,8 +81,8 @@ local full_llvm(version) = debian_pipeline(
|
|||
[
|
||||
debian_pipeline('Debian sid (amd64)', docker_base + 'debian-sid', distro='sid'),
|
||||
debian_pipeline('Debian sid/Debug (amd64)', docker_base + 'debian-sid', build_type='Debug', distro='sid'),
|
||||
clang(14),
|
||||
full_llvm(14),
|
||||
clang(16),
|
||||
full_llvm(16),
|
||||
debian_pipeline('Debian buster (amd64)', docker_base + 'debian-buster'),
|
||||
debian_pipeline('Debian stable (i386)', docker_base + 'debian-stable/i386'),
|
||||
debian_pipeline('Debian sid (ARM64)', docker_base + 'debian-sid', arch='arm64', distro='sid'),
|
||||
|
|
|
@ -17,11 +17,13 @@ cmake_minimum_required(VERSION 3.7)
|
|||
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
|
||||
|
||||
project(liboxenmq
|
||||
VERSION 1.2.15
|
||||
VERSION 1.2.16
|
||||
LANGUAGES CXX C)
|
||||
|
||||
include(GNUInstallDirs)
|
||||
|
||||
include(cmake/libatomic.cmake)
|
||||
|
||||
message(STATUS "oxenmq v${PROJECT_VERSION}")
|
||||
|
||||
set(OXENMQ_LIBVERSION 0)
|
||||
|
@ -39,9 +41,15 @@ set(oxenmq_INSTALL_DEFAULT OFF)
|
|||
if(BUILD_SHARED_LIBS OR oxenmq_IS_TOPLEVEL_PROJECT)
|
||||
set(oxenmq_INSTALL_DEFAULT ON)
|
||||
endif()
|
||||
set(oxenmq_EPOLL_DEFAULT OFF)
|
||||
if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT CMAKE_CROSSCOMPILING)
|
||||
set(oxenmq_EPOLL_DEFAULT ON)
|
||||
endif()
|
||||
|
||||
option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT})
|
||||
option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT})
|
||||
option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON)
|
||||
option(OXENMQ_USE_EPOLL "Use epoll for socket polling (requires Linux)" ${oxenmq_EPOLL_DEFAULT})
|
||||
|
||||
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
|
||||
|
||||
|
@ -59,6 +67,9 @@ add_library(oxenmq
|
|||
oxenmq/worker.cpp
|
||||
)
|
||||
set_target_properties(oxenmq PROPERTIES SOVERSION ${OXENMQ_LIBVERSION})
|
||||
if(OXENMQ_USE_EPOLL)
|
||||
target_compile_definitions(oxenmq PRIVATE OXENMQ_USE_EPOLL)
|
||||
endif()
|
||||
|
||||
set(THREADS_PREFER_PTHREAD_FLAG ON)
|
||||
find_package(Threads REQUIRED)
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
include(CheckCXXSourceCompiles)
|
||||
|
||||
function(check_working_cxx_atomics64 varname)
|
||||
set(OLD_CMAKE_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})
|
||||
if (EMBEDDED_CFG)
|
||||
set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -m32 -march=i486")
|
||||
elseif(MSVC OR MSVC_VERSION)
|
||||
set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -arch:IA32 -std:c++14")
|
||||
else()
|
||||
# CMAKE_CXX_STANDARD does not propagate to cmake compile tests
|
||||
set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -std=c++14")
|
||||
endif()
|
||||
check_cxx_source_compiles("
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
std::atomic<uint64_t> x (0);
|
||||
int main() {
|
||||
uint64_t i = x.load(std::memory_order_relaxed);
|
||||
return 0;
|
||||
}
|
||||
" ${varname})
|
||||
set(CMAKE_REQUIRED_FLAGS ${OLD_CMAKE_REQUIRED_FLAGS})
|
||||
endfunction()
|
||||
|
||||
check_working_cxx_atomics64(HAVE_CXX_ATOMICS64_WITHOUT_LIB)
|
||||
|
||||
if(HAVE_CXX_ATOMICS64_WITHOUT_LIB)
|
||||
message(STATUS "Have working 64bit atomics")
|
||||
return()
|
||||
endif()
|
||||
|
||||
if (NOT MSVC AND NOT MSVC_VERSION)
|
||||
check_library_exists(atomic __atomic_load_8 "" HAVE_CXX_LIBATOMICS64)
|
||||
if (HAVE_CXX_LIBATOMICS64)
|
||||
message(STATUS "Have 64bit atomics via library")
|
||||
list(APPEND CMAKE_REQUIRED_LIBRARIES "atomic")
|
||||
check_working_cxx_atomics64(HAVE_CXX_ATOMICS64_WITH_LIB)
|
||||
if (HAVE_CXX_ATOMICS64_WITH_LIB)
|
||||
message(STATUS "Can link with libatomic")
|
||||
link_libraries(-latomic)
|
||||
return()
|
||||
endif()
|
||||
endif()
|
||||
endif()
|
||||
if (MSVC OR MSVC_VERSION)
|
||||
message(FATAL_ERROR "Host compiler must support 64-bit std::atomic! (What does MSVC do to inline atomics?)")
|
||||
else()
|
||||
message(FATAL_ERROR "Host compiler must support 64-bit std::atomic!")
|
||||
endif()
|
|
@ -1,7 +1,7 @@
|
|||
set(LIBZMQ_PREFIX ${CMAKE_BINARY_DIR}/libzmq)
|
||||
set(ZeroMQ_VERSION 4.3.4)
|
||||
set(ZeroMQ_VERSION 4.3.5)
|
||||
set(LIBZMQ_URL https://github.com/zeromq/libzmq/releases/download/v${ZeroMQ_VERSION}/zeromq-${ZeroMQ_VERSION}.tar.gz)
|
||||
set(LIBZMQ_HASH SHA512=e198ef9f82d392754caadd547537666d4fba0afd7d027749b3adae450516bcf284d241d4616cad3cb4ad9af8c10373d456de92dc6d115b037941659f141e7c0e)
|
||||
set(LIBZMQ_HASH SHA512=a71d48aa977ad8941c1609947d8db2679fc7a951e4cd0c3a1127ae026d883c11bd4203cf315de87f95f5031aec459a731aec34e5ce5b667b8d0559b157952541)
|
||||
|
||||
message(${LIBZMQ_URL})
|
||||
|
||||
|
@ -13,13 +13,23 @@ endif()
|
|||
|
||||
file(MAKE_DIRECTORY ${LIBZMQ_PREFIX}/include)
|
||||
|
||||
set(libzmq_compiler_args)
|
||||
foreach(lang C CXX)
|
||||
foreach(thing COMPILER FLAGS COMPILER_LAUNCHER)
|
||||
if(DEFINED CMAKE_${lang}_${thing})
|
||||
list(APPEND libzmq_compiler_args "-DCMAKE_${lang}_${thing}=${CMAKE_${lang}_${thing}}")
|
||||
endif()
|
||||
endforeach()
|
||||
endforeach()
|
||||
|
||||
include(ExternalProject)
|
||||
include(ProcessorCount)
|
||||
ExternalProject_Add(libzmq_external
|
||||
PREFIX ${LIBZMQ_PREFIX}
|
||||
URL ${LIBZMQ_URL}
|
||||
URL_HASH ${LIBZMQ_HASH}
|
||||
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
|
||||
CMAKE_ARGS ${libzmq_compiler_args}
|
||||
-DCMAKE_BUILD_TYPE=Release
|
||||
-DWITH_LIBSODIUM=ON -DZMQ_BUILD_TESTS=OFF -DWITH_PERF_TOOL=OFF -DENABLE_DRAFTS=OFF
|
||||
-DBUILD_SHARED=OFF -DBUILD_STATIC=ON -DWITH_DOC=OFF -DCMAKE_INSTALL_PREFIX=${LIBZMQ_PREFIX}
|
||||
BUILD_BYPRODUCTS ${LIBZMQ_PREFIX}/${CMAKE_INSTALL_LIBDIR}/libzmq.a
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 462be41bd481b331dabeb3c220b349ef35c89e56
|
||||
Subproject commit d6f300d7d250ae0a9708090c0011c0f495377e6a
|
|
@ -3,12 +3,47 @@
|
|||
#include <oxenc/hex.h>
|
||||
#include <optional>
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
extern "C" {
|
||||
#include <sys/epoll.h>
|
||||
#include <unistd.h>
|
||||
}
|
||||
#endif
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) {
|
||||
return o << conn.to_string();
|
||||
}
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
|
||||
void OxenMQ::rebuild_pollitems() {
|
||||
|
||||
if (epoll_fd != -1)
|
||||
close(epoll_fd);
|
||||
epoll_fd = epoll_create1(0);
|
||||
|
||||
struct epoll_event ev;
|
||||
ev.events = EPOLLIN | EPOLLET;
|
||||
ev.data.u64 = EPOLL_COMMAND_ID;
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, command.get(zmq::sockopt::fd), &ev);
|
||||
|
||||
ev.data.u64 = EPOLL_WORKER_ID;
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, workers_socket.get(zmq::sockopt::fd), &ev);
|
||||
|
||||
ev.data.u64 = EPOLL_ZAP_ID;
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, zap_auth.get(zmq::sockopt::fd), &ev);
|
||||
|
||||
for (auto& [id, s] : connections) {
|
||||
ev.data.u64 = id;
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s.get(zmq::sockopt::fd), &ev);
|
||||
}
|
||||
connections_updated = false;
|
||||
}
|
||||
|
||||
#else // !OXENMQ_USE_EPOLL
|
||||
|
||||
namespace {
|
||||
|
||||
void add_pollitem(std::vector<zmq::pollitem_t>& pollitems, zmq::socket_t& sock) {
|
||||
|
@ -33,6 +68,8 @@ void OxenMQ::rebuild_pollitems() {
|
|||
connections_updated = false;
|
||||
}
|
||||
|
||||
#endif // OXENMQ_USE_EPOLL
|
||||
|
||||
void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
|
||||
socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count());
|
||||
socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count());
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#pragma once
|
||||
#include <limits>
|
||||
#include "oxenmq.h"
|
||||
|
||||
// Inside some method:
|
||||
|
@ -20,6 +21,14 @@ constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers";
|
|||
constexpr char SN_ADDR_SELF[] = "inproc://sn-self";
|
||||
constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01";
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
|
||||
constexpr auto EPOLL_COMMAND_ID = std::numeric_limits<uint64_t>::max();
|
||||
constexpr auto EPOLL_WORKER_ID = std::numeric_limits<uint64_t>::max() - 1;
|
||||
constexpr auto EPOLL_ZAP_ID = std::numeric_limits<uint64_t>::max() - 2;
|
||||
|
||||
#endif
|
||||
|
||||
/// Destructor for create_message(std::string&&) that zmq calls when it's done with the message.
|
||||
extern "C" inline void message_buffer_destroy(void*, void* hint) {
|
||||
delete reinterpret_cast<std::string*>(hint);
|
||||
|
|
|
@ -420,8 +420,14 @@ private:
|
|||
/// sockets for inter-thread communication followed by a pollitem for every connection (both
|
||||
/// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever
|
||||
/// `connections_updated` is set to true.
|
||||
///
|
||||
/// On Linux, when using epoll, this is not used.
|
||||
std::vector<zmq::pollitem_t> pollitems;
|
||||
|
||||
/// On Linux, when using epoll, this tracks the epoll file descriptor. Otherwise it does
|
||||
/// nothing.
|
||||
int epoll_fd = -1;
|
||||
|
||||
/// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets.
|
||||
void rebuild_pollitems();
|
||||
|
||||
|
|
127
oxenmq/proxy.cpp
127
oxenmq/proxy.cpp
|
@ -11,6 +11,10 @@ extern "C" {
|
|||
}
|
||||
#endif
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#endif
|
||||
|
||||
#ifndef _WIN32
|
||||
extern "C" {
|
||||
#include <sys/stat.h>
|
||||
|
@ -496,6 +500,12 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
// General vector for handling incoming messages:
|
||||
std::vector<zmq::message_t> parts;
|
||||
|
||||
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
std::vector<struct epoll_event> evs;
|
||||
#endif
|
||||
|
||||
while (true) {
|
||||
std::chrono::milliseconds poll_timeout;
|
||||
if (max_workers == 0) { // Will be 0 only if we are quitting
|
||||
|
@ -509,9 +519,52 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
|
||||
}
|
||||
|
||||
if (connections_updated)
|
||||
if (connections_updated) {
|
||||
rebuild_pollitems();
|
||||
// If we just rebuilt the queue then do a full check of everything, because we might
|
||||
// have sockets that already edge-triggered that we need to fully drain before we start
|
||||
// polling.
|
||||
proxy_skip_one_poll = true;
|
||||
}
|
||||
|
||||
// We round-robin connections when pulling off pending messages one-by-one rather than
|
||||
// pulling off all messages from one connection before moving to the next; thus in cases of
|
||||
// contention we end up fairly distributing.
|
||||
queue.reserve(connections.size() + 1);
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
|
||||
|
||||
if (proxy_skip_one_poll) {
|
||||
proxy_skip_one_poll = false;
|
||||
|
||||
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_all = true;
|
||||
}
|
||||
else {
|
||||
OMQ_TRACE("polling for new messages via epoll");
|
||||
|
||||
evs.resize(3 + connections.size());
|
||||
const int max = epoll_wait(epoll_fd, evs.data(), evs.size(), poll_timeout.count());
|
||||
|
||||
queue.clear();
|
||||
for (int i = 0; i < max; i++) {
|
||||
const auto conn_id = evs[i].data.u64;
|
||||
if (conn_id == EPOLL_COMMAND_ID)
|
||||
process_command = true;
|
||||
else if (conn_id == EPOLL_WORKER_ID)
|
||||
process_worker = true;
|
||||
else if (conn_id == EPOLL_ZAP_ID)
|
||||
process_zap = true;
|
||||
else if (auto it = connections.find(conn_id); it != connections.end())
|
||||
queue.push_back(&*it);
|
||||
}
|
||||
queue.push_back(nullptr);
|
||||
}
|
||||
|
||||
#else
|
||||
if (proxy_skip_one_poll)
|
||||
proxy_skip_one_poll = false;
|
||||
else {
|
||||
|
@ -524,23 +577,29 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
|
||||
}
|
||||
|
||||
OMQ_TRACE("processing control messages");
|
||||
// Retrieve any waiting incoming control messages
|
||||
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
|
||||
proxy_control_message(control_parts, len);
|
||||
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
|
||||
#endif
|
||||
|
||||
if (process_command) {
|
||||
OMQ_TRACE("processing control messages");
|
||||
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
|
||||
proxy_control_message(control_parts, len);
|
||||
}
|
||||
|
||||
OMQ_TRACE("processing worker messages");
|
||||
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
|
||||
proxy_worker_message(control_parts, len);
|
||||
if (process_worker) {
|
||||
OMQ_TRACE("processing worker messages");
|
||||
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
|
||||
proxy_worker_message(control_parts, len);
|
||||
}
|
||||
|
||||
OMQ_TRACE("processing timers");
|
||||
zmq_timers_execute(timers.get());
|
||||
|
||||
// Handle any zap authentication
|
||||
OMQ_TRACE("processing zap requests");
|
||||
process_zap_requests();
|
||||
if (process_zap) {
|
||||
// Handle any zap authentication
|
||||
OMQ_TRACE("processing zap requests");
|
||||
process_zap_requests();
|
||||
}
|
||||
|
||||
// See if we can drain anything from the current queue before we potentially add to it
|
||||
// below.
|
||||
|
@ -548,15 +607,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
proxy_process_queue();
|
||||
|
||||
OMQ_TRACE("processing new incoming messages");
|
||||
if (process_all) {
|
||||
queue.clear();
|
||||
for (auto& id_sock : connections)
|
||||
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
|
||||
queue.push_back(&id_sock);
|
||||
queue.push_back(nullptr);
|
||||
}
|
||||
|
||||
// We round-robin connections when pulling off pending messages one-by-one rather than
|
||||
// pulling off all messages from one connection before moving to the next; thus in cases of
|
||||
// contention we end up fairly distributing.
|
||||
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
|
||||
queue.reserve(connections.size() + 1);
|
||||
for (auto& id_sock : connections)
|
||||
queue.push_back(&id_sock);
|
||||
queue.push_back(nullptr);
|
||||
size_t end = queue.size() - 1;
|
||||
|
||||
for (size_t pos = 0; pos != end; ++pos %= queue.size()) {
|
||||
|
@ -580,13 +638,40 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
proxy_to_worker(id, sock, parts);
|
||||
|
||||
if (connections_updated) {
|
||||
// If connections got updated then our points are stale, to restart the proxy loop;
|
||||
// if there are still messages waiting we'll end up right back here.
|
||||
// If connections got updated then our points are stale, so restart the proxy loop;
|
||||
// we'll immediately end up right back here at least once before we resume polling.
|
||||
OMQ_TRACE("connections became stale; short-circuiting incoming message loop");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
|
||||
// state on another socket, perhaps by writing to it) then we need to repeat the loop
|
||||
// *without* going back to epoll again, until we get through everything without any
|
||||
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
|
||||
// of ZMQ's edge-triggered notifications on zmq fd's.
|
||||
//
|
||||
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
|
||||
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
||||
if (!connections_updated && !proxy_skip_one_poll) {
|
||||
for (auto* s : {&command, &workers_socket, &zap_auth}) {
|
||||
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
|
||||
proxy_skip_one_poll = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!proxy_skip_one_poll) {
|
||||
for (auto& [id, sock] : connections) {
|
||||
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
|
||||
proxy_skip_one_poll = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
OMQ_TRACE("done proxy loop");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue