From 0858dd278b91a899b69210088622ca6fa3bb0eed Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 31 Aug 2022 11:59:24 -0300 Subject: [PATCH 1/5] oxen-encoding submodule to latest tagged release --- oxen-encoding | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oxen-encoding b/oxen-encoding index 085bbff..707a836 160000 --- a/oxen-encoding +++ b/oxen-encoding @@ -1 +1 @@ -Subproject commit 085bbff774106dfd41c4848ef03e4de259accf4d +Subproject commit 707a83609fb64d09b61ed1e56c82bf692050d2a1 From 25f714371b73ab81454fe9089f49d4f3601a1d7c Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 28 Sep 2022 13:28:48 -0300 Subject: [PATCH 2/5] Remove deprecated code - Removes the old lokimq name compatibility shims - Removes the oxenmq::bt* -> oxenc::bt* shim headers --- CMakeLists.txt | 32 ----------------------------- liblokimq.pc.in | 14 ------------- lokimq/address.h | 4 ---- lokimq/auth.h | 4 ---- lokimq/base32z.h | 4 ---- lokimq/base64.h | 4 ---- lokimq/batch.h | 4 ---- lokimq/bt_serialize.h | 4 ---- lokimq/bt_value.h | 4 ---- lokimq/connections.h | 4 ---- lokimq/hex.h | 4 ---- lokimq/lokimq.h | 10 --------- lokimq/message.h | 4 ---- lokimq/variant.h | 2 -- lokimq/version.h | 4 ---- oxenmq/bt_producer.h | 12 ----------- oxenmq/bt_serialize.h | 48 ------------------------------------------- oxenmq/bt_value.h | 45 ---------------------------------------- 18 files changed, 207 deletions(-) delete mode 100644 liblokimq.pc.in delete mode 100644 lokimq/address.h delete mode 100644 lokimq/auth.h delete mode 100644 lokimq/base32z.h delete mode 100644 lokimq/base64.h delete mode 100644 lokimq/batch.h delete mode 100644 lokimq/bt_serialize.h delete mode 100644 lokimq/bt_value.h delete mode 100644 lokimq/connections.h delete mode 100644 lokimq/hex.h delete mode 100644 lokimq/lokimq.h delete mode 100644 lokimq/message.h delete mode 100644 lokimq/variant.h delete mode 100644 lokimq/version.h delete mode 100644 oxenmq/bt_producer.h delete mode 100644 oxenmq/bt_serialize.h delete mode 100644 oxenmq/bt_value.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 269d92d..10667af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,15 +42,11 @@ endif() option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT}) option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT}) option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON) -option(OXENMQ_LOKIMQ_COMPAT "Install lokimq compatibility headers and pkg-config for rename migration" ON) list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") configure_file(oxenmq/version.h.in oxenmq/version.h @ONLY) configure_file(liboxenmq.pc.in liboxenmq.pc @ONLY) -if(OXENMQ_LOKIMQ_COMPAT) - configure_file(liblokimq.pc.in liblokimq.pc @ONLY) -endif() add_library(oxenmq @@ -186,9 +182,6 @@ else() endif() add_library(oxenmq::oxenmq ALIAS oxenmq) -if(OXENMQ_LOKIMQ_COMPAT) - add_library(lokimq::lokimq ALIAS oxenmq) -endif() export( TARGETS oxenmq @@ -234,31 +227,6 @@ if(OXENMQ_INSTALL) DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig ) - if(OXENMQ_LOKIMQ_COMPAT) - install( - FILES lokimq/address.h - lokimq/auth.h - lokimq/base32z.h - lokimq/base64.h - lokimq/batch.h - lokimq/bt_serialize.h - lokimq/bt_value.h - lokimq/connections.h - lokimq/hex.h - lokimq/lokimq.h - lokimq/message.h - lokimq/variant.h - lokimq/version.h - DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq - ) - - install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/liblokimq.pc - DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig - ) - endif() - - endif() if(OXENMQ_BUILD_TESTS) diff --git a/liblokimq.pc.in b/liblokimq.pc.in deleted file mode 100644 index 8f4c5f0..0000000 --- a/liblokimq.pc.in +++ /dev/null @@ -1,14 +0,0 @@ -prefix=@CMAKE_INSTALL_PREFIX@ -exec_prefix=${prefix} -libdir=@CMAKE_INSTALL_FULL_LIBDIR@ -includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@ - -Name: liblokimq -Description: ZeroMQ-based communication library (compatibility package for liboxenmq) -Version: @PROJECT_VERSION@ - -Libs: -L${libdir} -loxenmq -Libs.private: @PRIVATE_LIBS@ -Requires: liboxenc -Requires.private: libzmq libsodium -Cflags: -I${includedir} diff --git a/lokimq/address.h b/lokimq/address.h deleted file mode 100644 index 5239b39..0000000 --- a/lokimq/address.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/address.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/auth.h b/lokimq/auth.h deleted file mode 100644 index d6c38f6..0000000 --- a/lokimq/auth.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/auth.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/base32z.h b/lokimq/base32z.h deleted file mode 100644 index 0f40acf..0000000 --- a/lokimq/base32z.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/base32z.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/base64.h b/lokimq/base64.h deleted file mode 100644 index 36f2c7d..0000000 --- a/lokimq/base64.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/base64.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/batch.h b/lokimq/batch.h deleted file mode 100644 index bf06f14..0000000 --- a/lokimq/batch.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/batch.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/bt_serialize.h b/lokimq/bt_serialize.h deleted file mode 100644 index 1d904a6..0000000 --- a/lokimq/bt_serialize.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/bt_serialize.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/bt_value.h b/lokimq/bt_value.h deleted file mode 100644 index c311b76..0000000 --- a/lokimq/bt_value.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/bt_value.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/connections.h b/lokimq/connections.h deleted file mode 100644 index 5775338..0000000 --- a/lokimq/connections.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/connections.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/hex.h b/lokimq/hex.h deleted file mode 100644 index 08c3b58..0000000 --- a/lokimq/hex.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/hex.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h deleted file mode 100644 index 57ed082..0000000 --- a/lokimq/lokimq.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once -#include "../oxenmq/oxenmq.h" - -namespace lokimq = oxenmq; - -namespace oxenmq { - -using LokiMQ = OxenMQ; - -} diff --git a/lokimq/message.h b/lokimq/message.h deleted file mode 100644 index a0e8ad5..0000000 --- a/lokimq/message.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/message.h" - -namespace lokimq = oxenmq; diff --git a/lokimq/variant.h b/lokimq/variant.h deleted file mode 100644 index 08cb791..0000000 --- a/lokimq/variant.h +++ /dev/null @@ -1,2 +0,0 @@ -#pragma once -#include "../oxenmq/variant.h" diff --git a/lokimq/version.h b/lokimq/version.h deleted file mode 100644 index d49e184..0000000 --- a/lokimq/version.h +++ /dev/null @@ -1,4 +0,0 @@ -#pragma once -#include "../oxenmq/version.h" - -namespace lokimq = oxenmq; diff --git a/oxenmq/bt_producer.h b/oxenmq/bt_producer.h deleted file mode 100644 index 67325fd..0000000 --- a/oxenmq/bt_producer.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -#include - -// Compatibility shim for oxenc includes - -namespace oxenmq { - -using oxenc::bt_list_producer; -using oxenc::bt_dict_producer; - -} // namespace oxenmq diff --git a/oxenmq/bt_serialize.h b/oxenmq/bt_serialize.h deleted file mode 100644 index 07dca2e..0000000 --- a/oxenmq/bt_serialize.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2019-2020, The Oxen Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#pragma once - -// Compatibility shim for oxenc includes - -#include - -namespace oxenmq { - -using oxenc::bt_deserialize_invalid; -using oxenc::bt_deserialize_invalid_type; -using oxenc::bt_serializer; -using oxenc::bt_serialize; -using oxenc::bt_deserialize; -using oxenc::bt_get; -using oxenc::get_int; -using oxenc::get_tuple; -using oxenc::bt_dict_consumer; -using oxenc::bt_list_consumer; - -} // namespace oxenmq diff --git a/oxenmq/bt_value.h b/oxenmq/bt_value.h deleted file mode 100644 index 14cd186..0000000 --- a/oxenmq/bt_value.h +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) 2019-2021, The Oxen Project -// -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#pragma once - -// Compatibility shim for oxenc includes - -#include - -namespace oxenmq { - -using oxenc::bt_value; -using oxenc::bt_dict; -using oxenc::bt_list; -using oxenc::bt_variant; - -using oxenc::has_alternative; -using oxenc::has_alternative_v; - -} From df19d1dd9448d237d40a13d1746060be1a9db6f2 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 28 Sep 2022 13:57:07 -0300 Subject: [PATCH 3/5] Add sid workaround lsb_release -sc on sid currently prints 'n/a' because of debian bugs 1020893 and 1008735. Add a workaround. Also bumps clang builds to latest version. --- .drone.jsonnet | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/.drone.jsonnet b/.drone.jsonnet index 005da65..e39873d 100644 --- a/.drone.jsonnet +++ b/.drone.jsonnet @@ -19,6 +19,7 @@ local debian_pipeline(name, cmake_extra='', build_type='Release', extra_cmds=[], + distro='$$(lsb_release -sc)', allow_fail=false) = { kind: 'pipeline', type: 'docker', @@ -39,7 +40,7 @@ local debian_pipeline(name, apt_get_quiet + 'install -y eatmydata', 'eatmydata ' + apt_get_quiet + ' install --no-install-recommends -y lsb-release', 'cp contrib/deb.oxen.io.gpg /etc/apt/trusted.gpg.d', - 'echo deb http://deb.oxen.io $$(lsb_release -sc) main >/etc/apt/sources.list.d/oxen.list', + 'echo deb http://deb.oxen.io ' + distro + ' main >/etc/apt/sources.list.d/oxen.list', 'eatmydata ' + apt_get_quiet + ' update', 'eatmydata ' + apt_get_quiet + 'dist-upgrade -y', 'eatmydata ' + apt_get_quiet + 'install -y cmake git ninja-build pkg-config ccache ' + std.join(' ', deps), @@ -56,6 +57,7 @@ local debian_pipeline(name, local clang(version) = debian_pipeline( 'Debian sid/clang-' + version + ' (amd64)', docker_base + 'debian-sid-clang', + distro='sid', deps=['clang-' + version] + default_deps_nocxx, cmake_extra='-DCMAKE_C_COMPILER=clang-' + version + ' -DCMAKE_CXX_COMPILER=clang++-' + version + ' ' ); @@ -63,6 +65,7 @@ local clang(version) = debian_pipeline( local full_llvm(version) = debian_pipeline( 'Debian sid/llvm-' + version + ' (amd64)', docker_base + 'debian-sid-clang', + distro='sid', deps=['clang-' + version, 'lld-' + version, 'libc++-' + version + '-dev', 'libc++abi-' + version + '-dev'] + default_deps_nocxx, cmake_extra='-DCMAKE_C_COMPILER=clang-' + version + @@ -76,13 +79,13 @@ local full_llvm(version) = debian_pipeline( [ - debian_pipeline('Debian sid (amd64)', docker_base + 'debian-sid'), - debian_pipeline('Debian sid/Debug (amd64)', docker_base + 'debian-sid', build_type='Debug'), - clang(13), - full_llvm(13), + debian_pipeline('Debian sid (amd64)', docker_base + 'debian-sid', distro='sid'), + debian_pipeline('Debian sid/Debug (amd64)', docker_base + 'debian-sid', build_type='Debug', distro='sid'), + clang(14), + full_llvm(14), debian_pipeline('Debian buster (amd64)', docker_base + 'debian-buster'), debian_pipeline('Debian stable (i386)', docker_base + 'debian-stable/i386'), - debian_pipeline('Debian sid (ARM64)', docker_base + 'debian-sid', arch='arm64'), + debian_pipeline('Debian sid (ARM64)', docker_base + 'debian-sid', arch='arm64', distro='sid'), debian_pipeline('Debian stable (armhf)', docker_base + 'debian-stable/arm32v7', arch='arm64'), debian_pipeline('Debian buster (armhf)', docker_base + 'debian-buster/arm32v7', arch='arm64'), debian_pipeline('Ubuntu focal (amd64)', docker_base + 'ubuntu-focal'), From 85437d167b5df03cdbd2760c4964acc00b54fe00 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Mon, 26 Sep 2022 10:26:24 -0400 Subject: [PATCH 4/5] initial implementation of generic pub/sub management Implements a generic pub/sub system for RPC endpoints to allow clients to subscribe to things. patch version bump tests included and passing --- CMakeLists.txt | 2 +- oxenmq/pubsub.h | 172 ++++++++++++ tests/CMakeLists.txt | 1 + tests/test_pubsub.cpp | 611 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 785 insertions(+), 1 deletion(-) create mode 100644 oxenmq/pubsub.h create mode 100644 tests/test_pubsub.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 10667af..ba0be63 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.13 + VERSION 1.2.14 LANGUAGES CXX C) include(GNUInstallDirs) diff --git a/oxenmq/pubsub.h b/oxenmq/pubsub.h new file mode 100644 index 0000000..5669a3d --- /dev/null +++ b/oxenmq/pubsub.h @@ -0,0 +1,172 @@ +#pragma once + +#include "connections.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace oxenmq { + +using namespace std::chrono_literals; + +namespace detail { + struct no_data_t final {}; + inline constexpr no_data_t no_data{}; + + template + struct SubData { + std::chrono::steady_clock::time_point expiry; + UserData user_data; + explicit SubData(std::chrono::steady_clock::time_point _exp) + : expiry{_exp}, user_data{} {} + }; + + template <> + struct SubData { + std::chrono::steady_clock::time_point expiry; + }; +} + + +/** + * OMQ Subscription class. Handles pub/sub connections such that the user only needs to call + * methods to subscribe and publish. + * + * FIXME: do we want an unsubscribe, or is expiry / conn management sufficient? + * + * Type UserData can contain whatever information the user may need at publish time, for example if + * the subscription is for logs the subscriber can specify log levels or categories, and the + * publisher can choose to send or not based on those. The UserData type, if provided and non-void, + * must be default constructible, must be comparable with ==, and must be movable. + */ +template +class Subscription { + static constexpr bool have_user_data = !std::is_void_v; + using UserData_if_present = std::conditional_t; + using subdata_t = detail::SubData; + + std::unordered_map subs; + std::shared_mutex _mutex; + const std::string description; // description of the sub for logging + const std::chrono::milliseconds sub_duration; // extended by re-subscribe + +public: + + Subscription() = delete; + Subscription(std::string description, std::chrono::milliseconds sub_duration = 30min) + : description{std::move(description)}, sub_duration{sub_duration} {} + + // returns true if new sub, false if refresh sub. throws on error. `data` will be checked + // against the existing data: if there is existing data and it compares `==` to the given value, + // false is returned (and the existing data is not replaced). Otherwise the given data gets + // stored for this connection (replacing existing data, if present), and true is returned. + bool subscribe(const ConnectionID& conn, UserData_if_present data) { + std::unique_lock lock{_mutex}; + auto expiry = std::chrono::steady_clock::now() + sub_duration; + auto [value, added] = subs.emplace(conn, subdata_t{expiry}); + if (added) { + if constexpr (have_user_data) + value->second.user_data = std::move(data); + return true; + } + + value->second.expiry = expiry; + + if constexpr (have_user_data) { + // if user_data changed, consider it a new sub rather than refresh, and update + // user_data in the mapped value. + if (!(value->second.user_data == data)) { + value->second.user_data = std::move(data); + return true; + } + } + return false; + } + + // no-user-data version, only available for Subscription (== Subscription without a + // UserData type). + template = 0> + bool subscribe(const ConnectionID& conn) { + return subscribe(conn, detail::no_data); + } + + // unsubscribe a connection ID. return the user data, if a sub was present. + template = 0> + std::optional unsubscribe(const ConnectionID& conn) { + std::unique_lock lock{_mutex}; + + auto node = subs.extract(conn); + if (!node.empty()) + return node.mapped().user_data; + + return std::nullopt; + } + + // no-user-data version, only available for Subscription (== Subscription without a + // UserData type). + template = 0> + bool unsubscribe(const ConnectionID& conn) { + std::unique_lock lock{_mutex}; + auto node = subs.extract(conn); + return !node.empty(); // true if removed, false if wasn't present + } + + // force removal of expired subscriptions. removal will otherwise only happen on publish + void remove_expired() { + std::unique_lock lock{_mutex}; + auto now = std::chrono::steady_clock::now(); + for (auto itr = subs.begin(); itr != subs.end();) { + if (itr->second.expiry < now) + itr = subs.erase(itr); + else + itr++; + } + } + + // Func is any callable which takes: + // - (const ConnectionID&, const UserData&) for Subscription with non-void UserData + // - (const ConnectionID&) for Subscription. + template + void publish(Func&& func) { + std::vector to_remove; + { + std::shared_lock lock(_mutex); + if (subs.empty()) + return; + + auto now = std::chrono::steady_clock::now(); + + for (const auto& [conn, sub] : subs) { + if (sub.expiry < now) + to_remove.push_back(conn); + else if constexpr (have_user_data) + func(conn, sub.user_data); + else + func(conn); + } + } + + if (to_remove.empty()) + return; + + std::unique_lock lock{_mutex}; + auto now = std::chrono::steady_clock::now(); + for (auto& conn : to_remove) { + auto it = subs.find(conn); + if (it != subs.end() && it->second.expiry < now /* recheck: client might have resubscribed in between locks */) { + subs.erase(it); + } + } + } + +}; + + +} // namespace oxenmq + +// vim:sw=4:et diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fadf85a..20e2282 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(tests test_commands.cpp test_failures.cpp test_inject.cpp + test_pubsub.cpp test_requests.cpp test_socket_limit.cpp test_tagged_threads.cpp diff --git a/tests/test_pubsub.cpp b/tests/test_pubsub.cpp new file mode 100644 index 0000000..75015be --- /dev/null +++ b/tests/test_pubsub.cpp @@ -0,0 +1,611 @@ +#include "common.h" +#include "oxenmq/pubsub.h" + +#include + +using namespace oxenmq; +using namespace std::chrono_literals; + +TEST_CASE("sub OK", "[pubsub]") { + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + Subscription<> greetings{"greetings"}; + + std::atomic is_new{false}; + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "greetings", [&](Message& m) { + is_new = greetings.subscribe(m.conn); + m.send_reply("OK"); + }); + server.start(); + + OxenMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + + std::atomic reply_count{0}; + client.add_category("notify", Access{AuthLevel::none}); + client.add_command("notify", "greetings", [&](Message& m) { + const auto& data = m.data; + if (!data.size()) + { + std::cerr << "client received public.greetings with empty data\n"; + return; + } + if (data[0] == "hello") + reply_count++; + }); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }); + + wait_for([&] { return connected || failed; }); + { + auto lock = catch_lock(); + REQUIRE( connected ); + REQUIRE_FALSE( failed ); + REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) ); + } + + std::atomic got_reply{false}; + bool success; + std::vector data; + client.request(c, "public.greetings", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"OK"}} ); + } + + greetings.publish([&](auto& conn) { + server.send(conn, "notify.greetings", "hello"); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count == 1 ); + } + + greetings.publish([&](auto& conn) { + server.send(conn, "notify.greetings", "hello"); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count == 2 ); + } + +} + +TEST_CASE("user data", "[pubsub]") { + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + Subscription greetings{"greetings"}; + + std::atomic is_new{false}; + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "greetings", [&](Message& m) { + is_new = greetings.subscribe(m.conn, std::string{m.data[0]}); + m.send_reply("OK"); + }); + server.start(); + + OxenMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + + std::string response{"foo"}; + std::atomic reply_count{0}; + std::atomic foo_count{0}; + client.add_category("notify", Access{AuthLevel::none}); + client.add_command("notify", "greetings", [&](Message& m) { + const auto& data = m.data; + if (!data.size()) + { + std::cerr << "client received public.greetings with empty data\n"; + return; + } + if (data[0] == response) + reply_count++; + if (data[0] == "foo") + foo_count++; + }); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }); + + wait_for([&] { return connected || failed; }); + { + auto lock = catch_lock(); + REQUIRE( connected ); + REQUIRE_FALSE( failed ); + REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) ); + } + + std::atomic got_reply{false}; + std::atomic success; + std::vector data; + client.request(c, "public.greetings", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }, response); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( is_new ); + REQUIRE( data == std::vector{{"OK"}} ); + } + + got_reply = false; + success = false; + client.request(c, "public.greetings", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }, response); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE_FALSE( is_new ); + REQUIRE( data == std::vector{{"OK"}} ); + } + + greetings.publish([&](auto& conn, std::string user) { + server.send(conn, "notify.greetings", user); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count == 1 ); + REQUIRE( foo_count == 1 ); + } + + got_reply = false; + success = false; + response = "bar"; + client.request(c, "public.greetings", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }, response); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( is_new ); + REQUIRE( data == std::vector{{"OK"}} ); + } + + greetings.publish([&](auto& conn, std::string user) { + server.send(conn, "notify.greetings", user); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count == 2 ); + REQUIRE( foo_count == 1 ); + } + +} + +TEST_CASE("unsubscribe", "[pubsub]") { + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + Subscription<> greetings{"greetings"}; + + std::atomic was_subbed{false}; + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "greetings", [&](Message& m) { + greetings.subscribe(m.conn); + m.send_reply("OK"); + }); + server.add_request_command("public", "goodbye", [&](Message& m) { + was_subbed = greetings.unsubscribe(m.conn); + m.send_reply("OK"); + }); + server.start(); + + OxenMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + + std::atomic reply_count{0}; + client.add_category("notify", Access{AuthLevel::none}); + client.add_command("notify", "greetings", [&](Message& m) { + const auto& data = m.data; + if (!data.size()) + { + std::cerr << "client received public.greetings with empty data\n"; + return; + } + if (data[0] == "hello") + reply_count++; + }); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }); + + wait_for([&] { return connected || failed; }); + { + auto lock = catch_lock(); + REQUIRE( connected ); + REQUIRE_FALSE( failed ); + REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) ); + } + + std::atomic got_reply{false}; + std::atomic success; + std::vector data; + client.request(c, "public.greetings", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"OK"}} ); + } + + greetings.publish([&](auto& conn) { + server.send(conn, "notify.greetings", "hello"); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count == 1 ); + } + + got_reply = false; + success = false; + client.request(c, "public.goodbye", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"OK"}} ); + REQUIRE( was_subbed ); + } + + greetings.publish([&](auto& conn) { + server.send(conn, "notify.greetings", "hello"); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count == 1 ); + } + + got_reply = false; + success = false; + client.request(c, "public.goodbye", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"OK"}} ); + REQUIRE( was_subbed == false); + } + +} + +TEST_CASE("expire", "[pubsub]") { + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + Subscription<> greetings{"greetings", 250ms}; + + std::atomic was_subbed{false}; + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "greetings", [&](Message& m) { + greetings.subscribe(m.conn); + m.send_reply("OK"); + }); + server.add_request_command("public", "goodbye", [&](Message& m) { + was_subbed = greetings.unsubscribe(m.conn); + m.send_reply("OK"); + }); + server.start(); + + OxenMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + + std::atomic reply_count{0}; + client.add_category("notify", Access{AuthLevel::none}); + client.add_command("notify", "greetings", [&](Message& m) { + const auto& data = m.data; + if (!data.size()) + { + std::cerr << "client received public.greetings with empty data\n"; + return; + } + if (data[0] == "hello") + reply_count++; + }); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }); + + wait_for([&] { return connected || failed; }); + { + auto lock = catch_lock(); + REQUIRE( connected ); + REQUIRE_FALSE( failed ); + REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) ); + } + + std::atomic got_reply{false}; + bool success; + std::vector data; + client.request(c, "public.greetings", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"OK"}} ); + } + + // should be expired by now + std::this_thread::sleep_for(500ms); + + greetings.remove_expired(); + + got_reply = false; + success = false; + client.request(c, "public.goodbye", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"OK"}} ); + REQUIRE( was_subbed == false); + } + +} + +TEST_CASE("multiple subs", "[pubsub]") { + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + Subscription<> greetings{"greetings"}; + + std::atomic is_new{false}; + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "greetings", [&](Message& m) { + is_new = greetings.subscribe(m.conn); + m.send_reply("OK"); + }); + server.start(); + +/* client 1 */ + std::atomic reply_count_c1{0}; + std::atomic connected_c1{false}, failed_c1{false}; + std::atomic got_reply_c1{false}; + bool success_c1; + std::vector data_c1; + std::string pubkey_c1; + OxenMQ client1( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + + client1.add_category("notify", Access{AuthLevel::none}); + client1.add_command("notify", "greetings", [&](Message& m) { + const auto& data = m.data; + if (!data.size()) + { + std::cerr << "client received public.greetings with empty data\n"; + return; + } + if (data[0] == "hello") + reply_count_c1++; + }); + + client1.start(); + + auto c1 = client1.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey_c1 = conn.pubkey(); connected_c1 = true; }, + [&](auto, auto) { failed_c1 = true; }); + + wait_for([&] { return connected_c1 || failed_c1; }); + { + auto lock = catch_lock(); + REQUIRE( connected_c1 ); + REQUIRE_FALSE( failed_c1 ); + REQUIRE( oxenc::to_hex(pubkey_c1) == oxenc::to_hex(server.get_pubkey()) ); + } + + client1.request(c1, "public.greetings", [&](bool ok, std::vector data_) { + got_reply_c1 = true; + success_c1 = ok; + data_c1 = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply_c1.load() ); + REQUIRE( success_c1 ); + REQUIRE( data_c1 == std::vector{{"OK"}} ); + } +/* end client 1 */ + +/* client 2 */ + std::atomic reply_count_c2{0}; + std::atomic connected_c2{false}, failed_c2{false}; + std::atomic got_reply_c2{false}; + bool success_c2; + std::vector data_c2; + std::string pubkey_c2; + OxenMQ client2( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + + client2.add_category("notify", Access{AuthLevel::none}); + client2.add_command("notify", "greetings", [&](Message& m) { + const auto& data = m.data; + if (!data.size()) + { + std::cerr << "client received public.greetings with empty data\n"; + return; + } + if (data[0] == "hello") + reply_count_c2++; + }); + + client2.start(); + + auto c2 = client2.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey_c2 = conn.pubkey(); connected_c2 = true; }, + [&](auto, auto) { failed_c2 = true; }); + + wait_for([&] { return connected_c2 || failed_c2; }); + { + auto lock = catch_lock(); + REQUIRE( connected_c2 ); + REQUIRE_FALSE( failed_c2 ); + REQUIRE( oxenc::to_hex(pubkey_c2) == oxenc::to_hex(server.get_pubkey()) ); + } + + client2.request(c2, "public.greetings", [&](bool ok, std::vector data_) { + got_reply_c2 = true; + success_c2 = ok; + data_c2 = std::move(data_); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got_reply_c2.load() ); + REQUIRE( success_c2 ); + REQUIRE( data_c2 == std::vector{{"OK"}} ); + } +/* end client2 */ + + greetings.publish([&](auto& conn) { + server.send(conn, "notify.greetings", "hello"); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count_c1 == 1 ); + REQUIRE( reply_count_c2 == 1 ); + } + + greetings.publish([&](auto& conn) { + server.send(conn, "notify.greetings", "hello"); + }); + + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( reply_count_c1 == 2 ); + REQUIRE( reply_count_c2 == 2 ); + } + +} + + +// vim:sw=4:et From 445f214840c2ba2fe4fc89f9f4592af23a3f0f91 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Wed, 5 Oct 2022 18:01:40 -0300 Subject: [PATCH 5/5] Fix a race condition with tagged thread startup There's a very rare race condition where a tagged thread doesn't seem to exist when the proxy tries syncing startup with them, and so the proxy thread hangs in startup. This addresses it by avoiding looking at the `proxy_thread` variable (which probably isn't thread safe) in the worker's startup, and signalling the you-need-to-shutdown condition via a third option for the (formerly boolean) `tagged_go`. --- oxenmq/oxenmq.cpp | 2 +- oxenmq/oxenmq.h | 3 ++- oxenmq/proxy.cpp | 2 +- oxenmq/worker.cpp | 4 ++-- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index 73706bd..06bfdd2 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -431,7 +431,7 @@ OxenMQ::~OxenMQ() { // up, so signal them so that they can end themselves. { std::lock_guard lock{tagged_startup_mutex}; - tagged_go = true; + tagged_go = tagged_go_mode::SHUTDOWN; } tagged_cv.notify_all(); for (auto& [run, busy, queue] : tagged_workers) diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index a3e3361..fe53627 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -788,7 +788,8 @@ private: /// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't /// used after startup is complete. std::mutex tagged_startup_mutex; - bool tagged_go{false}; + enum class tagged_go_mode { WAIT, GO, SHUTDOWN }; + tagged_go_mode tagged_go = tagged_go_mode::WAIT; std::condition_variable tagged_cv; public: diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index 5732fe1..aa0c166 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -455,7 +455,7 @@ void OxenMQ::proxy_loop_init() { OMQ_LOG(debug, "Waiting for tagged workers"); { std::unique_lock lock{tagged_startup_mutex}; - tagged_go = true; + tagged_go = tagged_go_mode::GO; } tagged_cv.notify_all(); std::unordered_set waiting_on; diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index fb54735..b54ad93 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -71,9 +71,9 @@ void OxenMQ::worker_thread(unsigned int index, std::optional tagged // is running). { std::unique_lock lock{tagged_startup_mutex}; - tagged_cv.wait(lock, [this] { return tagged_go; }); + tagged_cv.wait(lock, [this] { return tagged_go != tagged_go_mode::WAIT; }); } - if (!proxy_thread.joinable()) // OxenMQ destroyed without starting + if (tagged_go == tagged_go_mode::SHUTDOWN) // OxenMQ destroyed without starting return; tagged_socket.emplace(context, zmq::socket_type::dealer); }