Browse Source

Rename LokiMQ to OxenMQ

pull/31/head
Jason Rhinelander 1 year ago
parent
commit
2ae6b96016
  1. 144
      CMakeLists.txt
  2. 8
      liboxenmq.pc.in
  3. 5
      lokimq/version.h.in
  4. 8
      oxenmq/address.cpp
  5. 6
      oxenmq/address.h
  6. 26
      oxenmq/auth.cpp
  7. 4
      oxenmq/auth.h
  8. 4
      oxenmq/base32z.h
  9. 4
      oxenmq/base64.h
  10. 18
      oxenmq/batch.h
  11. 6
      oxenmq/bt_serialize.cpp
  12. 10
      oxenmq/bt_serialize.h
  13. 4
      oxenmq/bt_value.h
  14. 2
      oxenmq/byte_type.h
  15. 36
      oxenmq/connections.cpp
  16. 12
      oxenmq/connections.h
  17. 4
      oxenmq/hex.h
  18. 32
      oxenmq/jobs.cpp
  19. 10
      oxenmq/message.h
  20. 4
      oxenmq/oxenmq-internal.h
  21. 76
      oxenmq/oxenmq.cpp
  22. 116
      oxenmq/oxenmq.h
  23. 30
      oxenmq/proxy.cpp
  24. 2
      oxenmq/string_view.h
  25. 0
      oxenmq/variant.h
  26. 5
      oxenmq/version.h.in
  27. 26
      oxenmq/worker.cpp
  28. 2
      tests/CMakeLists.txt
  29. 6
      tests/common.h
  30. 2
      tests/test_address.cpp
  31. 18
      tests/test_batch.cpp
  32. 16
      tests/test_bt.cpp
  33. 36
      tests/test_commands.cpp
  34. 30
      tests/test_connect.cpp
  35. 198
      tests/test_encoding.cpp
  36. 14
      tests/test_failures.cpp
  37. 15
      tests/test_inject.cpp
  38. 20
      tests/test_requests.cpp
  39. 14
      tests/test_tagged_threads.cpp

144
CMakeLists.txt

@ -3,63 +3,63 @@ cmake_minimum_required(VERSION 3.7)
# Has to be set before `project()`, and ignored on non-macos:
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liblokimq CXX C)
project(liboxenmq CXX C)
include(GNUInstallDirs)
set(LOKIMQ_VERSION_MAJOR 1)
set(LOKIMQ_VERSION_MINOR 2)
set(LOKIMQ_VERSION_PATCH 3)
set(LOKIMQ_VERSION "${LOKIMQ_VERSION_MAJOR}.${LOKIMQ_VERSION_MINOR}.${LOKIMQ_VERSION_PATCH}")
message(STATUS "lokimq v${LOKIMQ_VERSION}")
set(OXENMQ_VERSION_MAJOR 1)
set(OXENMQ_VERSION_MINOR 2)
set(OXENMQ_VERSION_PATCH 3)
set(OXENMQ_VERSION "${OXENMQ_VERSION_MAJOR}.${OXENMQ_VERSION_MINOR}.${OXENMQ_VERSION_PATCH}")
message(STATUS "oxenmq v${OXENMQ_VERSION}")
set(LOKIMQ_LIBVERSION 0)
set(OXENMQ_LIBVERSION 0)
if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME)
set(lokimq_IS_TOPLEVEL_PROJECT TRUE)
set(oxenmq_IS_TOPLEVEL_PROJECT TRUE)
else()
set(lokimq_IS_TOPLEVEL_PROJECT FALSE)
set(oxenmq_IS_TOPLEVEL_PROJECT FALSE)
endif()
option(BUILD_SHARED_LIBS "Build shared libraries instead of static ones" ON)
set(lokimq_INSTALL_DEFAULT OFF)
if(BUILD_SHARED_LIBS OR lokimq_IS_TOPLEVEL_PROJECT)
set(lokimq_INSTALL_DEFAULT ON)
set(oxenmq_INSTALL_DEFAULT OFF)
if(BUILD_SHARED_LIBS OR oxenmq_IS_TOPLEVEL_PROJECT)
set(oxenmq_INSTALL_DEFAULT ON)
endif()
option(LOKIMQ_BUILD_TESTS "Building and perform lokimq tests" ${lokimq_IS_TOPLEVEL_PROJECT})
option(LOKIMQ_INSTALL "Add lokimq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${lokimq_INSTALL_DEFAULT})
option(LOKIMQ_INSTALL_CPPZMQ "Install cppzmq header with lokimq/ headers (requires LOKIMQ_INSTALL)" ON)
option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT})
option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT})
option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
configure_file(lokimq/version.h.in lokimq/version.h @ONLY)
configure_file(liblokimq.pc.in liblokimq.pc @ONLY)
add_library(lokimq
lokimq/address.cpp
lokimq/auth.cpp
lokimq/bt_serialize.cpp
lokimq/connections.cpp
lokimq/jobs.cpp
lokimq/lokimq.cpp
lokimq/proxy.cpp
lokimq/worker.cpp
configure_file(oxenmq/version.h.in oxenmq/version.h @ONLY)
configure_file(liboxenmq.pc.in liboxenmq.pc @ONLY)
add_library(oxenmq
oxenmq/address.cpp
oxenmq/auth.cpp
oxenmq/bt_serialize.cpp
oxenmq/connections.cpp
oxenmq/jobs.cpp
oxenmq/oxenmq.cpp
oxenmq/proxy.cpp
oxenmq/worker.cpp
)
set_target_properties(lokimq PROPERTIES SOVERSION ${LOKIMQ_LIBVERSION})
set_target_properties(oxenmq PROPERTIES SOVERSION ${OXENMQ_LIBVERSION})
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(lokimq PRIVATE Threads::Threads)
target_link_libraries(oxenmq PRIVATE Threads::Threads)
# libzmq is nearly impossible to link statically from a system-installed static library: it depends
# on a ton of other libraries, some of which are not all statically available. If the caller wants
# to mess with this, so be it: they can set up a libzmq target and we'll use it. Otherwise if they
# asked us to do things statically, don't even try to find a system lib and just build it.
set(lokimq_build_static_libzmq OFF)
set(oxenmq_build_static_libzmq OFF)
if(TARGET libzmq)
target_link_libraries(lokimq PUBLIC libzmq)
target_link_libraries(oxenmq PUBLIC libzmq)
elseif(BUILD_SHARED_LIBS)
include(FindPkgConfig)
pkg_check_modules(libzmq libzmq>=4.3 IMPORTED_TARGET)
@ -75,30 +75,30 @@ elseif(BUILD_SHARED_LIBS)
set_property(TARGET PkgConfig::libzmq PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${zmq_inc})
endif()
target_link_libraries(lokimq PUBLIC PkgConfig::libzmq)
target_link_libraries(oxenmq PUBLIC PkgConfig::libzmq)
else()
set(lokimq_build_static_libzmq ON)
set(oxenmq_build_static_libzmq ON)
endif()
else()
set(lokimq_build_static_libzmq ON)
set(oxenmq_build_static_libzmq ON)
endif()
if(lokimq_build_static_libzmq)
if(oxenmq_build_static_libzmq)
message(STATUS "libzmq >= 4.3 not found or static build requested, building bundled version")
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/local-libzmq")
include(LocalLibzmq)
target_link_libraries(lokimq PUBLIC libzmq_vendor)
target_link_libraries(oxenmq PUBLIC libzmq_vendor)
endif()
target_include_directories(lokimq
target_include_directories(oxenmq
PUBLIC
$<INSTALL_INTERFACE:>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cppzmq>
)
target_compile_options(lokimq PRIVATE -Wall -Wextra -Werror)
set_target_properties(lokimq PROPERTIES
target_compile_options(oxenmq PRIVATE -Wall -Wextra -Werror)
set_target_properties(oxenmq PROPERTIES
CXX_STANDARD 17
CXX_STANDARD_REQUIRED ON
CXX_EXTENSIONS OFF
@ -117,8 +117,8 @@ endfunction()
# If the caller has already set up a sodium target then we will just link to it, otherwise we go
# looking for it.
if(TARGET sodium)
target_link_libraries(lokimq PUBLIC sodium)
if(lokimq_build_static_libzmq)
target_link_libraries(oxenmq PUBLIC sodium)
if(oxenmq_build_static_libzmq)
target_link_libraries(libzmq_vendor INTERFACE sodium)
endif()
else()
@ -126,67 +126,67 @@ else()
pkg_check_modules(sodium REQUIRED libsodium IMPORTED_TARGET)
if(BUILD_SHARED_LIBS)
target_link_libraries(lokimq PUBLIC PkgConfig::sodium)
if(lokimq_build_static_libzmq)
target_link_libraries(oxenmq PUBLIC PkgConfig::sodium)
if(oxenmq_build_static_libzmq)
target_link_libraries(libzmq_vendor INTERFACE PkgConfig::sodium)
endif()
else()
link_dep_libs(lokimq PUBLIC "${sodium_STATIC_LIBRARY_DIRS}" ${sodium_STATIC_LIBRARIES})
target_include_directories(lokimq PUBLIC ${sodium_STATIC_INCLUDE_DIRS})
if(lokimq_build_static_libzmq)
link_dep_libs(oxenmq PUBLIC "${sodium_STATIC_LIBRARY_DIRS}" ${sodium_STATIC_LIBRARIES})
target_include_directories(oxenmq PUBLIC ${sodium_STATIC_INCLUDE_DIRS})
if(oxenmq_build_static_libzmq)
link_dep_libs(libzmq_vendor INTERFACE "${sodium_STATIC_LIBRARY_DIRS}" ${sodium_STATIC_LIBRARIES})
target_link_libraries(libzmq_vendor INTERFACE ${sodium_STATIC_INCLUDE_DIRS})
endif()
endif()
endif()
add_library(lokimq::lokimq ALIAS lokimq)
add_library(oxenmq::oxenmq ALIAS oxenmq)
export(
TARGETS lokimq
NAMESPACE lokimq::
FILE lokimqTargets.cmake
TARGETS oxenmq
NAMESPACE oxenmq::
FILE oxenmqTargets.cmake
)
if(LOKIMQ_INSTALL)
if(OXENMQ_INSTALL)
install(
TARGETS lokimq
EXPORT lokimqConfig
TARGETS oxenmq
EXPORT oxenmqConfig
DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
install(
FILES lokimq/address.h
lokimq/auth.h
lokimq/base32z.h
lokimq/base64.h
lokimq/batch.h
lokimq/bt_serialize.h
lokimq/bt_value.h
lokimq/connections.h
lokimq/hex.h
lokimq/lokimq.h
lokimq/message.h
lokimq/string_view.h
lokimq/variant.h
${CMAKE_CURRENT_BINARY_DIR}/lokimq/version.h
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
FILES oxenmq/address.h
oxenmq/auth.h
oxenmq/base32z.h
oxenmq/base64.h
oxenmq/batch.h
oxenmq/bt_serialize.h
oxenmq/bt_value.h
oxenmq/connections.h
oxenmq/hex.h
oxenmq/oxenmq.h
oxenmq/message.h
oxenmq/string_view.h
oxenmq/variant.h
${CMAKE_CURRENT_BINARY_DIR}/oxenmq/version.h
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/oxenmq
)
if(LOKIMQ_INSTALL_CPPZMQ)
if(OXENMQ_INSTALL_CPPZMQ)
install(
FILES cppzmq/zmq.hpp
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/oxenmq
)
endif()
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/liblokimq.pc
FILES ${CMAKE_CURRENT_BINARY_DIR}/liboxenmq.pc
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig
)
endif()
if(LOKIMQ_BUILD_TESTS)
if(OXENMQ_BUILD_TESTS)
add_subdirectory(tests)
endif()

8
liblokimq.pc.in → liboxenmq.pc.in

@ -3,11 +3,11 @@ exec_prefix=${prefix}
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
Name: liblokimq
Description: ZeroMQ-based communication library for Loki
Version: @LOKIMQ_VERSION@
Name: liboxenmq
Description: ZeroMQ-based communication library
Version: @OXENMQ_VERSION@
Libs: -L${libdir} -llokimq
Libs: -L${libdir} -loxenmq
Libs.private: @PRIVATE_LIBS@
Requires.private: libzmq libsodium
Cflags: -I${includedir}

5
lokimq/version.h.in

@ -1,5 +0,0 @@
namespace lokimq {
constexpr int VERSION_MAJOR = @LOKIMQ_VERSION_MAJOR@;
constexpr int VERSION_MINOR = @LOKIMQ_VERSION_MINOR@;
constexpr int VERSION_PATCH = @LOKIMQ_VERSION_PATCH@;
}

8
lokimq/address.cpp → oxenmq/address.cpp

@ -9,7 +9,7 @@
#include "base32z.h"
#include "base64.h"
namespace lokimq {
namespace oxenmq {
constexpr size_t enc_length(address::encoding enc) {
return enc == address::encoding::hex ? 64 :
@ -23,13 +23,13 @@ constexpr size_t enc_length(address::encoding enc) {
// given: for QR-friendly we only accept hex or base32z (since QR cannot handle base64's alphabet).
std::string decode_pubkey(std::string_view& in, bool qr) {
std::string pubkey;
if (in.size() >= 64 && lokimq::is_hex(in.substr(0, 64))) {
if (in.size() >= 64 && is_hex(in.substr(0, 64))) {
pubkey = from_hex(in.substr(0, 64));
in.remove_prefix(64);
} else if (in.size() >= 52 && lokimq::is_base32z(in.substr(0, 52))) {
} else if (in.size() >= 52 && is_base32z(in.substr(0, 52))) {
pubkey = from_base32z(in.substr(0, 52));
in.remove_prefix(52);
} else if (!qr && in.size() >= 43 && lokimq::is_base64(in.substr(0, 43))) {
} else if (!qr && in.size() >= 43 && is_base64(in.substr(0, 43))) {
pubkey = from_base64(in.substr(0, 43));
in.remove_prefix(43);
if (!in.empty() && in.front() == '=')

6
lokimq/address.h → oxenmq/address.h

@ -1,4 +1,4 @@
// Copyright (c) 2020, The Loki Project
// Copyright (c) 2020-2021, The Oxen Project
//
// All rights reserved.
//
@ -32,11 +32,11 @@
#include <cstdint>
#include <iosfwd>
namespace lokimq {
namespace oxenmq {
using namespace std::literals;
/** LokiMQ address abstraction class. This class uses and extends standard ZMQ addresses allowing
/** OxenMQ address abstraction class. This class uses and extends standard ZMQ addresses allowing
* extra parameters to be passed in in a relative standard way.
*
* External ZMQ addresses generally have two forms that we are concerned with: one for TCP and one

26
lokimq/auth.cpp → oxenmq/auth.cpp

@ -1,10 +1,10 @@
#include "lokimq.h"
#include "oxenmq.h"
#include "hex.h"
#include "lokimq-internal.h"
#include "oxenmq-internal.h"
#include <ostream>
#include <sstream>
namespace lokimq {
namespace oxenmq {
std::ostream& operator<<(std::ostream& o, AuthLevel a) {
return o << to_string(a);
@ -31,7 +31,7 @@ std::string zmtp_metadata(std::string_view key, std::string_view value) {
}
bool LokiMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info& peer,
bool OxenMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info& peer,
zmq::message_t& cmd, const cat_call_t& cat_call, std::vector<zmq::message_t>& data) {
auto command = view(cmd);
std::string reply;
@ -45,7 +45,7 @@ bool LokiMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info&
reply = "FORBIDDEN";
} else if (cat_call.first->access.local_sn && !local_service_node) {
LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
": that command is only available when this LokiMQ is running in service node mode");
": that command is only available when this OxenMQ is running in service node mode");
reply = "NOT_A_SERVICE_NODE";
} else if (cat_call.first->access.remote_sn && !peer.service_node) {
LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(cmd),
@ -81,7 +81,7 @@ bool LokiMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info&
return false;
}
void LokiMQ::set_active_sns(pubkey_set pubkeys) {
void OxenMQ::set_active_sns(pubkey_set pubkeys) {
if (proxy_thread.joinable()) {
auto data = bt_serialize(detail::serialize_object(std::move(pubkeys)));
detail::send_control(get_control_socket(), "SET_SNS", data);
@ -89,10 +89,10 @@ void LokiMQ::set_active_sns(pubkey_set pubkeys) {
proxy_set_active_sns(std::move(pubkeys));
}
}
void LokiMQ::proxy_set_active_sns(std::string_view data) {
void OxenMQ::proxy_set_active_sns(std::string_view data) {
proxy_set_active_sns(detail::deserialize_object<pubkey_set>(bt_deserialize<uintptr_t>(data)));
}
void LokiMQ::proxy_set_active_sns(pubkey_set pubkeys) {
void OxenMQ::proxy_set_active_sns(pubkey_set pubkeys) {
pubkey_set added, removed;
for (auto it = pubkeys.begin(); it != pubkeys.end(); ) {
auto& pk = *it;
@ -118,7 +118,7 @@ void LokiMQ::proxy_set_active_sns(pubkey_set pubkeys) {
proxy_update_active_sns_clean(std::move(added), std::move(removed));
}
void LokiMQ::update_active_sns(pubkey_set added, pubkey_set removed) {
void OxenMQ::update_active_sns(pubkey_set added, pubkey_set removed) {
LMQ_LOG(info, "uh, ", added.size());
if (proxy_thread.joinable()) {
std::array<uintptr_t, 2> data;
@ -129,12 +129,12 @@ void LokiMQ::update_active_sns(pubkey_set added, pubkey_set removed) {
proxy_update_active_sns(std::move(added), std::move(removed));
}
}
void LokiMQ::proxy_update_active_sns(bt_list_consumer data) {
void OxenMQ::proxy_update_active_sns(bt_list_consumer data) {
auto added = detail::deserialize_object<pubkey_set>(data.consume_integer<uintptr_t>());
auto remed = detail::deserialize_object<pubkey_set>(data.consume_integer<uintptr_t>());
proxy_update_active_sns(std::move(added), std::move(remed));
}
void LokiMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) {
void OxenMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) {
// We take a caller-provided set of added/removed then filter out any junk (bad pks, conflicting
// values, pubkeys that already(added) or do not(removed) exist), then pass the purified lists
// to the _clean version.
@ -167,7 +167,7 @@ void LokiMQ::proxy_update_active_sns(pubkey_set added, pubkey_set removed) {
proxy_update_active_sns_clean(std::move(added), std::move(removed));
}
void LokiMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed) {
void OxenMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed) {
LMQ_LOG(debug, "Updating SN auth status with +", added.size(), "/-", removed.size(), " pubkeys");
// For anything we remove we want close the connection to the SN (if outgoing), and remove the
@ -192,7 +192,7 @@ void LokiMQ::proxy_update_active_sns_clean(pubkey_set added, pubkey_set removed)
active_service_nodes.insert(std::move(pk));
}
void LokiMQ::process_zap_requests() {
void OxenMQ::process_zap_requests() {
for (std::vector<zmq::message_t> frames; recv_message_parts(zap_auth, frames, zmq::recv_flags::dontwait); frames.clear()) {
#ifndef NDEBUG
if (log_level() >= LogLevel::trace) {

4
lokimq/auth.h → oxenmq/auth.h

@ -4,7 +4,7 @@
#include <cstring>
#include <unordered_set>
namespace lokimq {
namespace oxenmq {
/// Authentication levels for command categories and connections
enum class AuthLevel {
@ -45,7 +45,7 @@ struct already_hashed {
};
/// std::unordered_set specialization for specifying pubkeys (used, in particular, by
/// LokiMQ::set_active_sns and LokiMQ::update_active_sns); this is a std::string unordered_set that
/// OxenMQ::set_active_sns and OxenMQ::update_active_sns); this is a std::string unordered_set that
/// also uses a specialized trivial hash function that uses part of the value itself (i.e. the
/// pubkey) directly as a hash value. (This is nice and fast for uniformly distributed values like
/// pubkeys and a terrible hash choice for anything else).

4
lokimq/base32z.h → oxenmq/base32z.h

@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, The Loki Project
// Copyright (c) 2019-2021, The Oxen Project
//
// All rights reserved.
//
@ -34,7 +34,7 @@
#include <cassert>
#include "byte_type.h"
namespace lokimq {
namespace oxenmq {
namespace detail {

4
lokimq/base64.h → oxenmq/base64.h

@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, The Loki Project
// Copyright (c) 2019-2021, The Oxen Project
//
// All rights reserved.
//
@ -34,7 +34,7 @@
#include <cassert>
#include "byte_type.h"
namespace lokimq {
namespace oxenmq {
namespace detail {

18
lokimq/batch.h → oxenmq/batch.h

@ -1,4 +1,4 @@
// Copyright (c) 2020, The Loki Project
// Copyright (c) 2020-2021, The Oxen Project
//
// All rights reserved.
//
@ -30,9 +30,9 @@
#include <exception>
#include <functional>
#include <vector>
#include "lokimq.h"
#include "oxenmq.h"
namespace lokimq {
namespace oxenmq {
namespace detail {
@ -78,7 +78,7 @@ public:
* This is designed to be like a very stripped down version of a std::promise/std::future pair. We
* reimplemented it, however, because by ditching all the thread synchronization that promise/future
* guarantees we can substantially reduce call overhead (by a factor of ~8 according to benchmarking
* code). Since LokiMQ's proxy<->worker communication channel already gives us thread that overhead
* code). Since OxenMQ's proxy<->worker communication channel already gives us thread that overhead
* would just be wasted.
*
* @tparam R the value type held by the result; must be default constructible. Note, however, that
@ -135,13 +135,13 @@ public:
void get() { if (exc) std::rethrow_exception(exc); }
};
/// Helper class used to set up batches of jobs to be scheduled via the lokimq job handler.
/// Helper class used to set up batches of jobs to be scheduled via the oxenmq job handler.
///
/// @tparam R - the return type of the individual jobs
///
template <typename R>
class Batch final : private detail::Batch {
friend class LokiMQ;
friend class OxenMQ;
public:
/// The completion function type, called after all jobs have finished.
using CompletionFunc = std::function<void(std::vector<job_result<R>> results)>;
@ -168,7 +168,7 @@ private:
void check_not_started() {
if (started)
throw std::logic_error("Cannot add jobs or completion function after starting a lokimq::Batch!");
throw std::logic_error("Cannot add jobs or completion function after starting a oxenmq::Batch!");
}
public:
@ -202,7 +202,7 @@ public:
/// \param thread - optional tagged thread in which to schedule the completion job. If not
/// provided then the completion job is scheduled in the pool of batch job threads.
///
/// `thread` can be provided the value &LokiMQ::run_in_proxy to invoke the completion function
/// `thread` can be provided the value &OxenMQ::run_in_proxy to invoke the completion function
/// *IN THE PROXY THREAD* itself after all jobs have finished. Be very, very careful: this
/// should be a nearly trivial job that does not require any substantial CPU time and does not
/// block for any reason. This is only intended for the case where the completion job is so
@ -268,7 +268,7 @@ private:
template <typename R>
void LokiMQ::batch(Batch<R>&& batch) {
void OxenMQ::batch(Batch<R>&& batch) {
if (batch.size().first == 0)
throw std::logic_error("Cannot batch a a job batch with 0 jobs");
// Need to send this over to the proxy thread via the base class pointer. It assumes ownership.

6
lokimq/bt_serialize.cpp → oxenmq/bt_serialize.cpp

@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, The Loki Project
// Copyright (c) 2019-2021, The Oxen Project
//
// All rights reserved.
//
@ -29,7 +29,7 @@
#include "bt_serialize.h"
#include <iterator>
namespace lokimq {
namespace oxenmq {
namespace detail {
/// Reads digits into an unsigned 64-bit int.
@ -228,4 +228,4 @@ std::pair<std::string_view, std::string_view> bt_dict_consumer::next_string() {
}
} // namespace lokimq
} // namespace oxenmq

10
lokimq/bt_serialize.h → oxenmq/bt_serialize.h

@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, The Loki Project
// Copyright (c) 2019-2020, The Oxen Project
//
// All rights reserved.
//
@ -46,12 +46,12 @@
#include "bt_value.h"
namespace lokimq {
namespace oxenmq {
using namespace std::literals;
/** \file
* LokiMQ serialization for internal commands is very simple: we support two primitive types,
* OxenMQ serialization for internal commands is very simple: we support two primitive types,
* strings and integers, and two container types, lists and dicts with string keys. On the wire
* these go in BitTorrent byte encoding as described in BEP-0003
* (https://www.bittorrent.org/beps/bep_0003.html#bencoding).
@ -596,7 +596,7 @@ template <typename T, typename It>
void get_tuple_impl_one(T& t, It& it) {
const bt_variant& v = *it++;
if constexpr (std::is_integral_v<T>) {
t = lokimq::get_int<T>(v);
t = oxenmq::get_int<T>(v);
} else if constexpr (is_bt_tuple<T>) {
if (std::holds_alternative<bt_list>(v))
throw std::invalid_argument{"Unable to convert tuple: cannot create sub-tuple from non-bt_list"};
@ -912,4 +912,4 @@ public:
};
} // namespace lokimq
} // namespace oxenmq

4
lokimq/bt_value.h → oxenmq/bt_value.h

@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, The Loki Project
// Copyright (c) 2019-2021, The Oxen Project
//
// All rights reserved.
//
@ -38,7 +38,7 @@
#include <string>
#include <string_view>
namespace lokimq {
namespace oxenmq {
struct bt_value;

2
lokimq/byte_type.h → oxenmq/byte_type.h

@ -6,7 +6,7 @@
#include <iterator>
#include <type_traits>
namespace lokimq::detail {
namespace oxenmq::detail {
// Fallback - we just try a char
template <typename OutputIt, typename = void>

36
lokimq/connections.cpp → oxenmq/connections.cpp

@ -1,8 +1,8 @@
#include "lokimq.h"
#include "lokimq-internal.h"
#include "oxenmq.h"
#include "oxenmq-internal.h"
#include "hex.h"
namespace lokimq {
namespace oxenmq {
std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) {
if (!conn.pk.empty())
@ -24,7 +24,7 @@ void add_pollitem(std::vector<zmq::pollitem_t>& pollitems, zmq::socket_t& sock)
} // anonymous namespace
void LokiMQ::rebuild_pollitems() {
void OxenMQ::rebuild_pollitems() {
pollitems.clear();
add_pollitem(pollitems, command);
add_pollitem(pollitems, workers_socket);
@ -35,7 +35,7 @@ void LokiMQ::rebuild_pollitems() {
pollitems_stale = false;
}
void LokiMQ::setup_external_socket(zmq::socket_t& socket) {
void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
socket.set(zmq::sockopt::reconnect_ivl, (int) RECONNECT_INTERVAL.count());
socket.set(zmq::sockopt::reconnect_ivl_max, (int) RECONNECT_INTERVAL_MAX.count());
socket.set(zmq::sockopt::handshake_ivl, (int) HANDSHAKE_TIME.count());
@ -47,7 +47,7 @@ void LokiMQ::setup_external_socket(zmq::socket_t& socket) {
}
}
void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey) {
void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey) {
setup_external_socket(socket);
@ -67,7 +67,7 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
// else let ZMQ pick a random one
}
ConnectionID LokiMQ::connect_sn(std::string_view pubkey, std::chrono::milliseconds keep_alive, std::string_view hint) {
ConnectionID OxenMQ::connect_sn(std::string_view pubkey, std::chrono::milliseconds keep_alive, std::string_view hint) {
if (!proxy_thread.joinable())
throw std::logic_error("Cannot call connect_sn() before calling `start()`");
@ -76,7 +76,7 @@ ConnectionID LokiMQ::connect_sn(std::string_view pubkey, std::chrono::millisecon
return pubkey;
}
ConnectionID LokiMQ::connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
ConnectionID OxenMQ::connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
AuthLevel auth_level, std::chrono::milliseconds timeout) {
if (!proxy_thread.joinable())
throw std::logic_error("Cannot call connect_remote() before calling `start()`");
@ -96,13 +96,13 @@ ConnectionID LokiMQ::connect_remote(const address& remote, ConnectSuccess on_con
return id;
}
ConnectionID LokiMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
std::string_view pubkey, AuthLevel auth_level, std::chrono::milliseconds timeout) {
return connect_remote(address{remote}.set_pubkey(pubkey),
std::move(on_connect), std::move(on_failure), auth_level, timeout);
}
void LokiMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
detail::send_control(get_control_socket(), "DISCONNECT", bt_serialize<bt_dict>({
{"conn_id", id.id},
{"linger_ms", linger.count()},
@ -111,7 +111,7 @@ void LokiMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
}
std::pair<zmq::socket_t *, std::string>
LokiMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive) {
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive) {
ConnectionID remote_cid{remote};
auto its = peers.equal_range(remote_cid);
peer_info* peer = nullptr;
@ -186,7 +186,7 @@ LokiMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
return {&connections.back(), ""s};
}
std::pair<zmq::socket_t *, std::string> LokiMQ::proxy_connect_sn(bt_dict_consumer data) {
std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consumer data) {
std::string_view hint, remote_pk;
std::chrono::milliseconds keep_alive;
bool optional = false, incoming_only = false, outgoing_only = false;
@ -226,7 +226,7 @@ void update_connection_indices(Container& c, size_t index, AccessIndex get_index
/// Closes outgoing connections and removes all references. Note that this will call `erase()`
/// which can invalidate iterators on the various connection containers - if you don't want that,
/// delete it first so that the container won't contain the element being deleted.
void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) {
void OxenMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) {
connections[index].set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0);
pollitems_stale = true;
connections.erase(connections.begin() + index);
@ -244,7 +244,7 @@ void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds ling
conn_index_to_id.erase(conn_index_to_id.begin() + index);
}
void LokiMQ::proxy_expire_idle_peers() {
void OxenMQ::proxy_expire_idle_peers() {
for (auto it = peers.begin(); it != peers.end(); ) {
auto &info = it->second;
if (info.outgoing()) {
@ -267,7 +267,7 @@ void LokiMQ::proxy_expire_idle_peers() {
}
}
void LokiMQ::proxy_conn_cleanup() {
void OxenMQ::proxy_conn_cleanup() {
LMQ_TRACE("starting proxy connections cleanup");
// Drop idle connections (if we haven't done it in a while)
@ -307,7 +307,7 @@ void LokiMQ::proxy_conn_cleanup() {
LMQ_TRACE("done proxy connections cleanup");
};
void LokiMQ::proxy_connect_remote(bt_dict_consumer data) {
void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
AuthLevel auth_level = AuthLevel::none;
long long conn_id = -1;
ConnectSuccess on_connect;
@ -372,7 +372,7 @@ void LokiMQ::proxy_connect_remote(bt_dict_consumer data) {
peers.emplace(std::move(conn), std::move(peer));
}
void LokiMQ::proxy_disconnect(bt_dict_consumer data) {
void OxenMQ::proxy_disconnect(bt_dict_consumer data) {
ConnectionID connid{-1};
std::chrono::milliseconds linger = 1s;
@ -388,7 +388,7 @@ void LokiMQ::proxy_disconnect(bt_dict_consumer data) {
proxy_disconnect(std::move(connid), linger);
}
void LokiMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger) {
void OxenMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger) {
LMQ_TRACE("Disconnecting outgoing connection to ", conn);
auto pr = peers.equal_range(conn);
for (auto it = pr.first; it != pr.second; ++it) {

12
lokimq/connections.h → oxenmq/connections.h

@ -8,7 +8,7 @@
#include <utility>
#include <variant>
namespace lokimq {
namespace oxenmq {
struct ConnectionID;
@ -77,18 +77,18 @@ private:
long long id = 0;
std::string pk;
std::string route;
friend class LokiMQ;
friend class OxenMQ;
friend struct std::hash<ConnectionID>;
template <typename... T>
friend bt_dict detail::build_send(ConnectionID to, std::string_view cmd, T&&... opts);
friend std::ostream& operator<<(std::ostream& o, const ConnectionID& conn);
};
} // namespace lokimq
} // namespace oxenmq
namespace std {
template <> struct hash<lokimq::ConnectionID> {
size_t operator()(const lokimq::ConnectionID &c) const {
return c.sn() ? lokimq::already_hashed{}(c.pk) :
template <> struct hash<oxenmq::ConnectionID> {
size_t operator()(const oxenmq::ConnectionID &c) const {
return c.sn() ? oxenmq::already_hashed{}(c.pk) :
std::hash<long long>{}(c.id) + std::hash<std::string>{}(c.route);
}
};

4
lokimq/hex.h → oxenmq/hex.h

@ -1,4 +1,4 @@
// Copyright (c) 2019-2020, The Loki Project
// Copyright (c) 2019-2021, The Oxen Project
//
// All rights reserved.
//
@ -34,7 +34,7 @@
#include <cassert>
#include "byte_type.h"
namespace lokimq {
namespace oxenmq {
namespace detail {

32
lokimq/jobs.cpp → oxenmq/jobs.cpp

@ -1,10 +1,10 @@
#include "lokimq.h"
#include "oxenmq.h"
#include "batch.h"
#include "lokimq-internal.h"
#include "oxenmq-internal.h"
namespace lokimq {
namespace oxenmq {
void LokiMQ::proxy_batch(detail::Batch* batch) {
void OxenMQ::proxy_batch(detail::Batch* batch) {
batches.insert(batch);
const auto [jobs, tagged_threads] = batch->size();
LMQ_TRACE("proxy queuing batch job with ", jobs, " jobs", tagged_threads ? " (job uses tagged thread(s))" : "");
@ -26,7 +26,7 @@ void LokiMQ::proxy_batch(detail::Batch* batch) {
proxy_skip_one_poll = true;
}
void LokiMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread) {
void OxenMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread) {
if (thread && thread->_id == -1)
throw std::logic_error{"job() cannot be used to queue an in-proxy job"};
auto* b = new Batch<void>;
@ -35,7 +35,7 @@ void LokiMQ::job(std::function<void()> f, std::optional<TaggedThreadID> thread)
detail::send_control(get_control_socket(), "BATCH", bt_serialize(reinterpret_cast<uintptr_t>(baseptr)));
}
void LokiMQ::proxy_schedule_reply_job(std::function<void()> f) {
void OxenMQ::proxy_schedule_reply_job(std::function<void()> f) {
auto* b = new Batch<void>;
b->add_job(std::move(f));
batches.insert(b);
@ -43,7 +43,7 @@ void LokiMQ::proxy_schedule_reply_job(std::function<void()> f) {
proxy_skip_one_poll = true;
}
void LokiMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
void OxenMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserved, int& active, bool reply) {
while (!jobs.empty() && active_workers() < max_workers &&
(active < reserved || active_workers() < general_workers)) {
proxy_run_worker(get_idle_worker().load(std::move(jobs.front()), reply));
@ -54,20 +54,20 @@ void LokiMQ::proxy_run_batch_jobs(std::queue<batch_job>& jobs, const int reserve
// Called either within the proxy thread, or before the proxy thread has been created; actually adds
// the timer. If the timer object hasn't been set up yet it gets set up here.
void LokiMQ::proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, int thread) {
void OxenMQ::proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, int thread) {
if (!timers)
timers.reset(zmq_timers_new());
int timer_id = zmq_timers_add(timers.get(),
interval.count(),
[](int timer_id, void* self) { static_cast<LokiMQ*>(self)->_queue_timer_job(timer_id); },
[](int timer_id, void* self) { static_cast<OxenMQ*>(self)->_queue_timer_job(timer_id); },
this);
if (timer_id == -1)
throw zmq::error_t{};
timer_jobs[timer_id] = { std::move(job), squelch, false, thread };
}
void LokiMQ::proxy_timer(bt_list_consumer timer_data) {
void OxenMQ::proxy_timer(bt_list_consumer timer_data) {
std::unique_ptr<std::function<void()>> func{reinterpret_cast<std::function<void()>*>(timer_data.consume_integer<uintptr_t>())};
auto interval = std::chrono::milliseconds{timer_data.consume_integer<uint64_t>()};
auto squelch = timer_data.consume_integer<bool>();
@ -77,7 +77,7 @@ void LokiMQ::proxy_timer(bt_list_consumer timer_data) {
proxy_timer(std::move(*func), interval, squelch, thread);
}
void LokiMQ::_queue_timer_job(int timer_id) {
void OxenMQ::_queue_timer_job(int timer_id) {
auto it = timer_jobs.find(timer_id);
if (it == timer_jobs.end()) {
LMQ_LOG(warn, "Could not find timer job ", timer_id);
@ -107,7 +107,7 @@ void LokiMQ::_queue_timer_job(int timer_id) {
auto it = timer_jobs.find(timer_id);
if (it != timer_jobs.end())
it->second.running = false;
}, LokiMQ::run_in_proxy);
}, OxenMQ::run_in_proxy);
}
batches.insert(b);
LMQ_TRACE("b: ", b->size().first, ", ", b->size().second, "; thread = ", thread);
@ -118,7 +118,7 @@ void LokiMQ::_queue_timer_job(int timer_id) {
queue.emplace(static_cast<detail::Batch*>(b), 0);
}
void LokiMQ::add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
void OxenMQ::add_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch, std::optional<TaggedThreadID> thread) {
int th_id = thread ? thread->_id : 0;
if (proxy_thread.joinable()) {
detail::send_control(get_control_socket(), "TIMER", bt_serialize(bt_list{{
@ -131,9 +131,9 @@ void LokiMQ::add_timer(std::function<void()> job, std::chrono::milliseconds inte
}
}
void LokiMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); }
void OxenMQ::TimersDeleter::operator()(void* timers) { zmq_timers_destroy(&timers); }
TaggedThreadID LokiMQ::add_tagged_thread(std::string name, std::function<void()> start) {
TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function<void()> start) {
if (proxy_thread.joinable())
throw std::logic_error{"Cannot add tagged threads after calling `start()`"};
@ -146,7 +146,7 @@ TaggedThreadID LokiMQ::add_tagged_thread(std::string name, std::function<void()>
run.worker_routing_id = "t" + std::to_string(run.worker_id);
LMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
run.worker_thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id, name, std::move(start)};
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)};
return TaggedThreadID{static_cast<int>(run.worker_id)};
}

10
lokimq/message.h → oxenmq/message.h

@ -2,16 +2,16 @@
#include <vector>
#include "connections.h"
namespace lokimq {
namespace oxenmq {
class LokiMQ;
class OxenMQ;
/// Encapsulates an incoming message from a remote connection with message details plus extra
/// info need to send a reply back through the proxy thread via the `reply()` method. Note that
/// this object gets reused: callbacks should use but not store any reference beyond the callback.
class Message {
public:
LokiMQ& lokimq; ///< The owning LokiMQ object
OxenMQ& oxenmq; ///< The owning OxenMQ object
std::vector<std::string_view> data; ///< The provided command data parts, if any.
ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status.
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`.
@ -19,8 +19,8 @@ public:
std::string remote; ///< Some sort of remote address from which the request came. Often "IP" for TCP connections and "localhost:UID:GID:PID" for UDP connections.
/// Constructor
Message(LokiMQ& lmq, ConnectionID cid, Access access, std::string remote)
: lokimq{lmq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {}
Message(OxenMQ& lmq, ConnectionID cid, Access access, std::string remote)
: oxenmq{lmq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {}
// Non-copyable
Message(const Message&) = delete;

4
lokimq/lokimq-internal.h → oxenmq/oxenmq-internal.h

@ -1,5 +1,5 @@
#pragma once
#include "lokimq.h"
#include "oxenmq.h"
// Inside some method:
// LMQ_LOG(warn, "bad ", 42, " stuff");
@ -13,7 +13,7 @@
# define LMQ_TRACE(...)
#endif
namespace lokimq {
namespace oxenmq {
constexpr char SN_ADDR_COMMAND[] = "inproc://sn-command";
constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers";

76
lokimq/lokimq.cpp → oxenmq/oxenmq.cpp

@ -1,5 +1,5 @@
#include "lokimq.h"
#include "lokimq-internal.h"
#include "oxenmq.h"
#include "oxenmq-internal.h"
#include "zmq.hpp"
#include <map>
#include <random>
@ -13,7 +13,7 @@ extern "C" {
}
#include "hex.h"
namespace lokimq {
namespace oxenmq {
namespace {
@ -76,20 +76,20 @@ std::pair<std::string, AuthLevel> extract_metadata(zmq::message_t& msg) {
} // namespace detail
void LokiMQ::set_zmq_context_option(zmq::ctxopt option, int value) {
void OxenMQ::set_zmq_context_option(zmq::ctxopt option, int value) {
context.set(option, value);
}
void LokiMQ::log_level(LogLevel level) {
void OxenMQ::log_level(LogLevel level) {
log_lvl.store(level, std::memory_order_relaxed);
}
LogLevel LokiMQ::log_level() const {
LogLevel OxenMQ::log_level() const {
return log_lvl.load(std::memory_order_relaxed);
}
CatHelper LokiMQ::add_category(std::string name, Access access_level, unsigned int reserved_threads, int max_queue) {
CatHelper OxenMQ::add_category(std::string name, Access access_level, unsigned int reserved_threads, int max_queue) {
check_not_started(proxy_thread, "add a category");
if (name.size() > MAX_CATEGORY_LENGTH)
@ -107,7 +107,7 @@ CatHelper LokiMQ::add_category(std::string name, Access access_level, unsigned i
return ret;
}
void LokiMQ::add_command(const std::string& category, std::string name, CommandCallback callback) {
void OxenMQ::add_command(const std::string& category, std::string name, CommandCallback callback) {
check_not_started(proxy_thread, "add a command");
if (name.size() > MAX_COMMAND_LENGTH)
@ -126,12 +126,12 @@ void LokiMQ::add_command(const std::string& category, std::string name, CommandC
throw std::runtime_error("Cannot add command `" + fullname + "': that command already exists");
}
void LokiMQ::add_request_command(const std::string& category, std::string name, CommandCallback callback) {
void OxenMQ::add_request_command(const std::string& category, std::string name, CommandCallback callback) {
add_command(category, name, std::move(callback));
categories.at(category).commands.at(name).second = true;
}
void LokiMQ::add_command_alias(std::string from, std::string to) {
void OxenMQ::add_command_alias(std::string from, std::string to) {
check_not_started(proxy_thread, "add a command alias");
if (from.empty())
@ -160,10 +160,10 @@ std::atomic<int> next_id{1};
/// Accesses a thread-local command socket connected to the proxy's command socket used to issue
/// commands in a thread-safe manner. A mutex is only required here the first time a thread
/// accesses the control socket.
zmq::socket_t& LokiMQ::get_control_socket() {
zmq::socket_t& OxenMQ::get_control_socket() {
assert(proxy_thread.joinable());
// Optimize by caching the last value; LokiMQ is often a singleton and in that case we're
// Optimize by caching the last value; OxenMQ is often a singleton and in that case we're
// going to *always* hit this optimization. Even if it isn't, we're probably likely to need the
// same control socket from the same thread multiple times sequentially so this may still help.
static thread_local int last_id = -1;
@ -174,7 +174,7 @@ zmq::socket_t& LokiMQ::get_control_socket() {
std::lock_guard lock{control_sockets_mutex};
if (proxy_shutting_down)
throw std::runtime_error("Unable to obtain LokiMQ control socket: proxy thread is shutting down");
throw std::runtime_error("Unable to obtain OxenMQ control socket: proxy thread is shutting down");
auto& socket = control_sockets[std::this_thread::get_id()];
if (!socket) {
@ -188,7 +188,7 @@ zmq::socket_t& LokiMQ::get_control_socket() {
}
LokiMQ::LokiMQ(
OxenMQ::OxenMQ(
std::string pubkey_,
std::string privkey_,
bool service_node,
@ -199,17 +199,17 @@ LokiMQ::LokiMQ(
sn_lookup{std::move(lookup)}, log_lvl{level}, logger{std::move(logger)}
{
LMQ_TRACE("Constructing LokiMQ, id=", object_id, ", this=", this);
LMQ_TRACE("Constructing OxenMQ, id=", object_id, ", this=", this);
if (sodium_init() == -1)
throw std::runtime_error{"libsodium initialization failed"};
if (pubkey.empty() != privkey.empty()) {
throw std::invalid_argument("LokiMQ construction failed: one (and only one) of pubkey/privkey is empty. Both must be specified, or both empty to generate a key.");
throw std::invalid_argument("OxenMQ construction failed: one (and only one) of pubkey/privkey is empty. Both must be specified, or both empty to generate a key.");
} else if (pubkey.empty()) {
if (service_node)
throw std::invalid_argument("Cannot construct a service node mode LokiMQ without a keypair");
LMQ_LOG(debug, "generating x25519 keypair for remote-only LokiMQ instance");
throw std::invalid_argument("Cannot construct a service node mode OxenMQ without a keypair");
LMQ_LOG(debug, "generating x25519 keypair for remote-only OxenMQ instance");
pubkey.resize(crypto_box_PUBLICKEYBYTES);
privkey.resize(crypto_box_SECRETKEYBYTES);
crypto_box_keypair(reinterpret_cast<unsigned char*>(&pubkey[0]), reinterpret_cast<unsigned char*>(&privkey[0]));
@ -224,11 +224,11 @@ LokiMQ::LokiMQ(
std::string verify_pubkey(crypto_box_PUBLICKEYBYTES, 0