mirror of https://github.com/oxen-io/oxen-mq.git
Merge branch 'stable' into ubuntu/focal
This commit is contained in:
commit
f918110a2e
|
@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
|
|||
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
|
||||
|
||||
project(liboxenmq
|
||||
VERSION 1.2.13
|
||||
VERSION 1.2.14
|
||||
LANGUAGES CXX C)
|
||||
|
||||
include(GNUInstallDirs)
|
||||
|
@ -42,15 +42,11 @@ endif()
|
|||
option(OXENMQ_BUILD_TESTS "Building and perform oxenmq tests" ${oxenmq_IS_TOPLEVEL_PROJECT})
|
||||
option(OXENMQ_INSTALL "Add oxenmq libraries and headers to cmake install target; defaults to ON if BUILD_SHARED_LIBS is enabled or we are the top-level project; OFF for a static subdirectory build" ${oxenmq_INSTALL_DEFAULT})
|
||||
option(OXENMQ_INSTALL_CPPZMQ "Install cppzmq header with oxenmq/ headers (requires OXENMQ_INSTALL)" ON)
|
||||
option(OXENMQ_LOKIMQ_COMPAT "Install lokimq compatibility headers and pkg-config for rename migration" ON)
|
||||
|
||||
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
|
||||
|
||||
configure_file(oxenmq/version.h.in oxenmq/version.h @ONLY)
|
||||
configure_file(liboxenmq.pc.in liboxenmq.pc @ONLY)
|
||||
if(OXENMQ_LOKIMQ_COMPAT)
|
||||
configure_file(liblokimq.pc.in liblokimq.pc @ONLY)
|
||||
endif()
|
||||
|
||||
|
||||
add_library(oxenmq
|
||||
|
@ -186,9 +182,6 @@ else()
|
|||
endif()
|
||||
|
||||
add_library(oxenmq::oxenmq ALIAS oxenmq)
|
||||
if(OXENMQ_LOKIMQ_COMPAT)
|
||||
add_library(lokimq::lokimq ALIAS oxenmq)
|
||||
endif()
|
||||
|
||||
export(
|
||||
TARGETS oxenmq
|
||||
|
@ -206,17 +199,12 @@ if(OXENMQ_INSTALL)
|
|||
install(
|
||||
FILES oxenmq/address.h
|
||||
oxenmq/auth.h
|
||||
oxenmq/base32z.h
|
||||
oxenmq/base64.h
|
||||
oxenmq/batch.h
|
||||
oxenmq/bt_serialize.h
|
||||
oxenmq/bt_value.h
|
||||
oxenmq/connections.h
|
||||
oxenmq/fmt.h
|
||||
oxenmq/hex.h
|
||||
oxenmq/oxenmq.h
|
||||
oxenmq/message.h
|
||||
oxenmq/variant.h
|
||||
oxenmq/oxenmq.h
|
||||
oxenmq/pubsub.h
|
||||
${CMAKE_CURRENT_BINARY_DIR}/oxenmq/version.h
|
||||
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/oxenmq
|
||||
)
|
||||
|
@ -234,31 +222,6 @@ if(OXENMQ_INSTALL)
|
|||
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig
|
||||
)
|
||||
|
||||
if(OXENMQ_LOKIMQ_COMPAT)
|
||||
install(
|
||||
FILES lokimq/address.h
|
||||
lokimq/auth.h
|
||||
lokimq/base32z.h
|
||||
lokimq/base64.h
|
||||
lokimq/batch.h
|
||||
lokimq/bt_serialize.h
|
||||
lokimq/bt_value.h
|
||||
lokimq/connections.h
|
||||
lokimq/hex.h
|
||||
lokimq/lokimq.h
|
||||
lokimq/message.h
|
||||
lokimq/variant.h
|
||||
lokimq/version.h
|
||||
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/lokimq
|
||||
)
|
||||
|
||||
install(
|
||||
FILES ${CMAKE_CURRENT_BINARY_DIR}/liblokimq.pc
|
||||
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig
|
||||
)
|
||||
endif()
|
||||
|
||||
|
||||
endif()
|
||||
|
||||
if(OXENMQ_BUILD_TESTS)
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
prefix=@CMAKE_INSTALL_PREFIX@
|
||||
exec_prefix=${prefix}
|
||||
libdir=@CMAKE_INSTALL_FULL_LIBDIR@
|
||||
includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
|
||||
|
||||
Name: liblokimq
|
||||
Description: ZeroMQ-based communication library (compatibility package for liboxenmq)
|
||||
Version: @PROJECT_VERSION@
|
||||
|
||||
Libs: -L${libdir} -loxenmq
|
||||
Libs.private: @PRIVATE_LIBS@
|
||||
Requires: liboxenc
|
||||
Requires.private: libzmq libsodium
|
||||
Cflags: -I${includedir}
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/address.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/auth.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/base32z.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/base64.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/batch.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/bt_serialize.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/bt_value.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/connections.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/hex.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,10 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/oxenmq.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using LokiMQ = OxenMQ;
|
||||
|
||||
}
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/message.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,2 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/variant.h"
|
|
@ -1,4 +0,0 @@
|
|||
#pragma once
|
||||
#include "../oxenmq/version.h"
|
||||
|
||||
namespace lokimq = oxenmq;
|
|
@ -1,45 +0,0 @@
|
|||
// Copyright (c) 2019-2021, The Oxen Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
//
|
||||
#include <oxenc/base32z.h>
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using oxenc::to_base32z_size;
|
||||
using oxenc::from_base32z_size;
|
||||
using oxenc::base32z_encoder;
|
||||
using oxenc::to_base32z;
|
||||
using oxenc::is_base32z;
|
||||
using oxenc::base32z_decoder;
|
||||
using oxenc::from_base32z;
|
||||
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
// Copyright (c) 2019-2021, The Oxen Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
|
||||
#include <oxenc/base64.h>
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using oxenc::to_base64_size;
|
||||
using oxenc::from_base64_size;
|
||||
using oxenc::base64_encoder;
|
||||
using oxenc::to_base64;
|
||||
using oxenc::to_base64_unpadded;
|
||||
using oxenc::is_base64;
|
||||
using oxenc::base64_decoder;
|
||||
using oxenc::from_base64;
|
||||
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
#pragma once
|
||||
|
||||
#include <oxenc/bt_producer.h>
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using oxenc::bt_list_producer;
|
||||
using oxenc::bt_dict_producer;
|
||||
|
||||
} // namespace oxenmq
|
|
@ -1,48 +0,0 @@
|
|||
// Copyright (c) 2019-2020, The Oxen Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
|
||||
#include <oxenc/bt_serialize.h>
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using oxenc::bt_deserialize_invalid;
|
||||
using oxenc::bt_deserialize_invalid_type;
|
||||
using oxenc::bt_serializer;
|
||||
using oxenc::bt_serialize;
|
||||
using oxenc::bt_deserialize;
|
||||
using oxenc::bt_get;
|
||||
using oxenc::get_int;
|
||||
using oxenc::get_tuple;
|
||||
using oxenc::bt_dict_consumer;
|
||||
using oxenc::bt_list_consumer;
|
||||
|
||||
} // namespace oxenmq
|
|
@ -1,45 +0,0 @@
|
|||
// Copyright (c) 2019-2021, The Oxen Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
|
||||
#include <oxenc/bt_value.h>
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using oxenc::bt_value;
|
||||
using oxenc::bt_dict;
|
||||
using oxenc::bt_list;
|
||||
using oxenc::bt_variant;
|
||||
|
||||
using oxenc::has_alternative;
|
||||
using oxenc::has_alternative_v;
|
||||
|
||||
}
|
47
oxenmq/hex.h
47
oxenmq/hex.h
|
@ -1,47 +0,0 @@
|
|||
// Copyright (c) 2019-2021, The Oxen Project
|
||||
//
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
#include <oxenc/hex.h>
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using oxenc::to_hex_size;
|
||||
using oxenc::from_hex_size;
|
||||
using oxenc::hex_encoder;
|
||||
using oxenc::to_hex;
|
||||
using oxenc::is_hex_digit;
|
||||
using oxenc::is_hex;
|
||||
using oxenc::hex_decoder;
|
||||
using oxenc::from_hex_digit;
|
||||
using oxenc::from_hex_pair;
|
||||
using oxenc::from_hex;
|
||||
|
||||
}
|
|
@ -431,7 +431,7 @@ OxenMQ::~OxenMQ() {
|
|||
// up, so signal them so that they can end themselves.
|
||||
{
|
||||
std::lock_guard lock{tagged_startup_mutex};
|
||||
tagged_go = true;
|
||||
tagged_go = tagged_go_mode::SHUTDOWN;
|
||||
}
|
||||
tagged_cv.notify_all();
|
||||
for (auto& [run, busy, queue] : tagged_workers)
|
||||
|
|
|
@ -788,7 +788,8 @@ private:
|
|||
/// then wait via this bool/c.v. to synchronize startup with the proxy thread. This mutex isn't
|
||||
/// used after startup is complete.
|
||||
std::mutex tagged_startup_mutex;
|
||||
bool tagged_go{false};
|
||||
enum class tagged_go_mode { WAIT, GO, SHUTDOWN };
|
||||
tagged_go_mode tagged_go = tagged_go_mode::WAIT;
|
||||
std::condition_variable tagged_cv;
|
||||
|
||||
public:
|
||||
|
|
|
@ -455,7 +455,7 @@ void OxenMQ::proxy_loop_init() {
|
|||
OMQ_LOG(debug, "Waiting for tagged workers");
|
||||
{
|
||||
std::unique_lock lock{tagged_startup_mutex};
|
||||
tagged_go = true;
|
||||
tagged_go = tagged_go_mode::GO;
|
||||
}
|
||||
tagged_cv.notify_all();
|
||||
std::unordered_set<std::string_view> waiting_on;
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
#pragma once
|
||||
|
||||
#include "connections.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <optional>
|
||||
|
||||
namespace oxenmq {
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace detail {
|
||||
struct no_data_t final {};
|
||||
inline constexpr no_data_t no_data{};
|
||||
|
||||
template <typename UserData>
|
||||
struct SubData {
|
||||
std::chrono::steady_clock::time_point expiry;
|
||||
UserData user_data;
|
||||
explicit SubData(std::chrono::steady_clock::time_point _exp)
|
||||
: expiry{_exp}, user_data{} {}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct SubData<void> {
|
||||
std::chrono::steady_clock::time_point expiry;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* OMQ Subscription class. Handles pub/sub connections such that the user only needs to call
|
||||
* methods to subscribe and publish.
|
||||
*
|
||||
* FIXME: do we want an unsubscribe, or is expiry / conn management sufficient?
|
||||
*
|
||||
* Type UserData can contain whatever information the user may need at publish time, for example if
|
||||
* the subscription is for logs the subscriber can specify log levels or categories, and the
|
||||
* publisher can choose to send or not based on those. The UserData type, if provided and non-void,
|
||||
* must be default constructible, must be comparable with ==, and must be movable.
|
||||
*/
|
||||
template <typename UserData = void>
|
||||
class Subscription {
|
||||
static constexpr bool have_user_data = !std::is_void_v<UserData>;
|
||||
using UserData_if_present = std::conditional_t<have_user_data, UserData, detail::no_data_t>;
|
||||
using subdata_t = detail::SubData<UserData>;
|
||||
|
||||
std::unordered_map<ConnectionID, subdata_t> subs;
|
||||
std::shared_mutex _mutex;
|
||||
const std::string description; // description of the sub for logging
|
||||
const std::chrono::milliseconds sub_duration; // extended by re-subscribe
|
||||
|
||||
public:
|
||||
|
||||
Subscription() = delete;
|
||||
Subscription(std::string description, std::chrono::milliseconds sub_duration = 30min)
|
||||
: description{std::move(description)}, sub_duration{sub_duration} {}
|
||||
|
||||
// returns true if new sub, false if refresh sub. throws on error. `data` will be checked
|
||||
// against the existing data: if there is existing data and it compares `==` to the given value,
|
||||
// false is returned (and the existing data is not replaced). Otherwise the given data gets
|
||||
// stored for this connection (replacing existing data, if present), and true is returned.
|
||||
bool subscribe(const ConnectionID& conn, UserData_if_present data) {
|
||||
std::unique_lock lock{_mutex};
|
||||
auto expiry = std::chrono::steady_clock::now() + sub_duration;
|
||||
auto [value, added] = subs.emplace(conn, subdata_t{expiry});
|
||||
if (added) {
|
||||
if constexpr (have_user_data)
|
||||
value->second.user_data = std::move(data);
|
||||
return true;
|
||||
}
|
||||
|
||||
value->second.expiry = expiry;
|
||||
|
||||
if constexpr (have_user_data) {
|
||||
// if user_data changed, consider it a new sub rather than refresh, and update
|
||||
// user_data in the mapped value.
|
||||
if (!(value->second.user_data == data)) {
|
||||
value->second.user_data = std::move(data);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// no-user-data version, only available for Subscription<void> (== Subscription without a
|
||||
// UserData type).
|
||||
template <bool enable = !have_user_data, std::enable_if_t<enable, int> = 0>
|
||||
bool subscribe(const ConnectionID& conn) {
|
||||
return subscribe(conn, detail::no_data);
|
||||
}
|
||||
|
||||
// unsubscribe a connection ID. return the user data, if a sub was present.
|
||||
template <bool enable = have_user_data, std::enable_if_t<enable, int> = 0>
|
||||
std::optional<UserData> unsubscribe(const ConnectionID& conn) {
|
||||
std::unique_lock lock{_mutex};
|
||||
|
||||
auto node = subs.extract(conn);
|
||||
if (!node.empty())
|
||||
return node.mapped().user_data;
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// no-user-data version, only available for Subscription<void> (== Subscription without a
|
||||
// UserData type).
|
||||
template <bool enable = !have_user_data, std::enable_if_t<enable, int> = 0>
|
||||
bool unsubscribe(const ConnectionID& conn) {
|
||||
std::unique_lock lock{_mutex};
|
||||
auto node = subs.extract(conn);
|
||||
return !node.empty(); // true if removed, false if wasn't present
|
||||
}
|
||||
|
||||
// force removal of expired subscriptions. removal will otherwise only happen on publish
|
||||
void remove_expired() {
|
||||
std::unique_lock lock{_mutex};
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
for (auto itr = subs.begin(); itr != subs.end();) {
|
||||
if (itr->second.expiry < now)
|
||||
itr = subs.erase(itr);
|
||||
else
|
||||
itr++;
|
||||
}
|
||||
}
|
||||
|
||||
// Func is any callable which takes:
|
||||
// - (const ConnectionID&, const UserData&) for Subscription<UserData> with non-void UserData
|
||||
// - (const ConnectionID&) for Subscription<void>.
|
||||
template <typename Func>
|
||||
void publish(Func&& func) {
|
||||
std::vector<ConnectionID> to_remove;
|
||||
{
|
||||
std::shared_lock lock(_mutex);
|
||||
if (subs.empty())
|
||||
return;
|
||||
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
for (const auto& [conn, sub] : subs) {
|
||||
if (sub.expiry < now)
|
||||
to_remove.push_back(conn);
|
||||
else if constexpr (have_user_data)
|
||||
func(conn, sub.user_data);
|
||||
else
|
||||
func(conn);
|
||||
}
|
||||
}
|
||||
|
||||
if (to_remove.empty())
|
||||
return;
|
||||
|
||||
std::unique_lock lock{_mutex};
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
for (auto& conn : to_remove) {
|
||||
auto it = subs.find(conn);
|
||||
if (it != subs.end() && it->second.expiry < now /* recheck: client might have resubscribed in between locks */) {
|
||||
subs.erase(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
} // namespace oxenmq
|
||||
|
||||
// vim:sw=4:et
|
|
@ -1,5 +0,0 @@
|
|||
#pragma once
|
||||
|
||||
// Compatibility shim for oxenc includes
|
||||
|
||||
#include <oxenc/variant.h>
|
|
@ -71,9 +71,9 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
|||
// is running).
|
||||
{
|
||||
std::unique_lock lock{tagged_startup_mutex};
|
||||
tagged_cv.wait(lock, [this] { return tagged_go; });
|
||||
tagged_cv.wait(lock, [this] { return tagged_go != tagged_go_mode::WAIT; });
|
||||
}
|
||||
if (!proxy_thread.joinable()) // OxenMQ destroyed without starting
|
||||
if (tagged_go == tagged_go_mode::SHUTDOWN) // OxenMQ destroyed without starting
|
||||
return;
|
||||
tagged_socket.emplace(context, zmq::socket_type::dealer);
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ add_executable(tests
|
|||
test_commands.cpp
|
||||
test_failures.cpp
|
||||
test_inject.cpp
|
||||
test_pubsub.cpp
|
||||
test_requests.cpp
|
||||
test_socket_limit.cpp
|
||||
test_tagged_threads.cpp
|
||||
|
|
|
@ -0,0 +1,611 @@
|
|||
#include "common.h"
|
||||
#include "oxenmq/pubsub.h"
|
||||
|
||||
#include <oxenc/hex.h>
|
||||
|
||||
using namespace oxenmq;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
TEST_CASE("sub OK", "[pubsub]") {
|
||||
std::string listen = random_localhost();
|
||||
OxenMQ server{
|
||||
"", "", // generate ephemeral keys
|
||||
false, // not a service node
|
||||
[](auto) { return ""; },
|
||||
};
|
||||
server.listen_curve(listen);
|
||||
|
||||
Subscription<> greetings{"greetings"};
|
||||
|
||||
std::atomic<bool> is_new{false};
|
||||
server.add_category("public", Access{AuthLevel::none});
|
||||
server.add_request_command("public", "greetings", [&](Message& m) {
|
||||
is_new = greetings.subscribe(m.conn);
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.start();
|
||||
|
||||
OxenMQ client(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
|
||||
std::atomic<int> reply_count{0};
|
||||
client.add_category("notify", Access{AuthLevel::none});
|
||||
client.add_command("notify", "greetings", [&](Message& m) {
|
||||
const auto& data = m.data;
|
||||
if (!data.size())
|
||||
{
|
||||
std::cerr << "client received public.greetings with empty data\n";
|
||||
return;
|
||||
}
|
||||
if (data[0] == "hello")
|
||||
reply_count++;
|
||||
});
|
||||
|
||||
client.start();
|
||||
|
||||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
wait_for([&] { return connected || failed; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( connected );
|
||||
REQUIRE_FALSE( failed );
|
||||
REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) );
|
||||
}
|
||||
|
||||
std::atomic<bool> got_reply{false};
|
||||
bool success;
|
||||
std::vector<std::string> data;
|
||||
client.request(c, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn) {
|
||||
server.send(conn, "notify.greetings", "hello");
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count == 1 );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn) {
|
||||
server.send(conn, "notify.greetings", "hello");
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count == 2 );
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("user data", "[pubsub]") {
|
||||
std::string listen = random_localhost();
|
||||
OxenMQ server{
|
||||
"", "", // generate ephemeral keys
|
||||
false, // not a service node
|
||||
[](auto) { return ""; },
|
||||
};
|
||||
server.listen_curve(listen);
|
||||
|
||||
Subscription<std::string> greetings{"greetings"};
|
||||
|
||||
std::atomic<bool> is_new{false};
|
||||
server.add_category("public", Access{AuthLevel::none});
|
||||
server.add_request_command("public", "greetings", [&](Message& m) {
|
||||
is_new = greetings.subscribe(m.conn, std::string{m.data[0]});
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.start();
|
||||
|
||||
OxenMQ client(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
|
||||
std::string response{"foo"};
|
||||
std::atomic<int> reply_count{0};
|
||||
std::atomic<int> foo_count{0};
|
||||
client.add_category("notify", Access{AuthLevel::none});
|
||||
client.add_command("notify", "greetings", [&](Message& m) {
|
||||
const auto& data = m.data;
|
||||
if (!data.size())
|
||||
{
|
||||
std::cerr << "client received public.greetings with empty data\n";
|
||||
return;
|
||||
}
|
||||
if (data[0] == response)
|
||||
reply_count++;
|
||||
if (data[0] == "foo")
|
||||
foo_count++;
|
||||
});
|
||||
|
||||
client.start();
|
||||
|
||||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
wait_for([&] { return connected || failed; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( connected );
|
||||
REQUIRE_FALSE( failed );
|
||||
REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) );
|
||||
}
|
||||
|
||||
std::atomic<bool> got_reply{false};
|
||||
std::atomic<bool> success;
|
||||
std::vector<std::string> data;
|
||||
client.request(c, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
}, response);
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( is_new );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
|
||||
got_reply = false;
|
||||
success = false;
|
||||
client.request(c, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
}, response);
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE_FALSE( is_new );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn, std::string user) {
|
||||
server.send(conn, "notify.greetings", user);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count == 1 );
|
||||
REQUIRE( foo_count == 1 );
|
||||
}
|
||||
|
||||
got_reply = false;
|
||||
success = false;
|
||||
response = "bar";
|
||||
client.request(c, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
}, response);
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( is_new );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn, std::string user) {
|
||||
server.send(conn, "notify.greetings", user);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count == 2 );
|
||||
REQUIRE( foo_count == 1 );
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("unsubscribe", "[pubsub]") {
|
||||
std::string listen = random_localhost();
|
||||
OxenMQ server{
|
||||
"", "", // generate ephemeral keys
|
||||
false, // not a service node
|
||||
[](auto) { return ""; },
|
||||
};
|
||||
server.listen_curve(listen);
|
||||
|
||||
Subscription<> greetings{"greetings"};
|
||||
|
||||
std::atomic<bool> was_subbed{false};
|
||||
server.add_category("public", Access{AuthLevel::none});
|
||||
server.add_request_command("public", "greetings", [&](Message& m) {
|
||||
greetings.subscribe(m.conn);
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.add_request_command("public", "goodbye", [&](Message& m) {
|
||||
was_subbed = greetings.unsubscribe(m.conn);
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.start();
|
||||
|
||||
OxenMQ client(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
|
||||
std::atomic<int> reply_count{0};
|
||||
client.add_category("notify", Access{AuthLevel::none});
|
||||
client.add_command("notify", "greetings", [&](Message& m) {
|
||||
const auto& data = m.data;
|
||||
if (!data.size())
|
||||
{
|
||||
std::cerr << "client received public.greetings with empty data\n";
|
||||
return;
|
||||
}
|
||||
if (data[0] == "hello")
|
||||
reply_count++;
|
||||
});
|
||||
|
||||
client.start();
|
||||
|
||||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
wait_for([&] { return connected || failed; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( connected );
|
||||
REQUIRE_FALSE( failed );
|
||||
REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) );
|
||||
}
|
||||
|
||||
std::atomic<bool> got_reply{false};
|
||||
std::atomic<bool> success;
|
||||
std::vector<std::string> data;
|
||||
client.request(c, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn) {
|
||||
server.send(conn, "notify.greetings", "hello");
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count == 1 );
|
||||
}
|
||||
|
||||
got_reply = false;
|
||||
success = false;
|
||||
client.request(c, "public.goodbye", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
REQUIRE( was_subbed );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn) {
|
||||
server.send(conn, "notify.greetings", "hello");
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count == 1 );
|
||||
}
|
||||
|
||||
got_reply = false;
|
||||
success = false;
|
||||
client.request(c, "public.goodbye", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
REQUIRE( was_subbed == false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("expire", "[pubsub]") {
|
||||
std::string listen = random_localhost();
|
||||
OxenMQ server{
|
||||
"", "", // generate ephemeral keys
|
||||
false, // not a service node
|
||||
[](auto) { return ""; },
|
||||
};
|
||||
server.listen_curve(listen);
|
||||
|
||||
Subscription<> greetings{"greetings", 250ms};
|
||||
|
||||
std::atomic<bool> was_subbed{false};
|
||||
server.add_category("public", Access{AuthLevel::none});
|
||||
server.add_request_command("public", "greetings", [&](Message& m) {
|
||||
greetings.subscribe(m.conn);
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.add_request_command("public", "goodbye", [&](Message& m) {
|
||||
was_subbed = greetings.unsubscribe(m.conn);
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.start();
|
||||
|
||||
OxenMQ client(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
|
||||
std::atomic<int> reply_count{0};
|
||||
client.add_category("notify", Access{AuthLevel::none});
|
||||
client.add_command("notify", "greetings", [&](Message& m) {
|
||||
const auto& data = m.data;
|
||||
if (!data.size())
|
||||
{
|
||||
std::cerr << "client received public.greetings with empty data\n";
|
||||
return;
|
||||
}
|
||||
if (data[0] == "hello")
|
||||
reply_count++;
|
||||
});
|
||||
|
||||
client.start();
|
||||
|
||||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; });
|
||||
|
||||
wait_for([&] { return connected || failed; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( connected );
|
||||
REQUIRE_FALSE( failed );
|
||||
REQUIRE( oxenc::to_hex(pubkey) == oxenc::to_hex(server.get_pubkey()) );
|
||||
}
|
||||
|
||||
std::atomic<bool> got_reply{false};
|
||||
bool success;
|
||||
std::vector<std::string> data;
|
||||
client.request(c, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
|
||||
// should be expired by now
|
||||
std::this_thread::sleep_for(500ms);
|
||||
|
||||
greetings.remove_expired();
|
||||
|
||||
got_reply = false;
|
||||
success = false;
|
||||
client.request(c, "public.goodbye", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply.load() );
|
||||
REQUIRE( success );
|
||||
REQUIRE( data == std::vector<std::string>{{"OK"}} );
|
||||
REQUIRE( was_subbed == false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("multiple subs", "[pubsub]") {
|
||||
std::string listen = random_localhost();
|
||||
OxenMQ server{
|
||||
"", "", // generate ephemeral keys
|
||||
false, // not a service node
|
||||
[](auto) { return ""; },
|
||||
};
|
||||
server.listen_curve(listen);
|
||||
|
||||
Subscription<> greetings{"greetings"};
|
||||
|
||||
std::atomic<bool> is_new{false};
|
||||
server.add_category("public", Access{AuthLevel::none});
|
||||
server.add_request_command("public", "greetings", [&](Message& m) {
|
||||
is_new = greetings.subscribe(m.conn);
|
||||
m.send_reply("OK");
|
||||
});
|
||||
server.start();
|
||||
|
||||
/* client 1 */
|
||||
std::atomic<int> reply_count_c1{0};
|
||||
std::atomic<bool> connected_c1{false}, failed_c1{false};
|
||||
std::atomic<bool> got_reply_c1{false};
|
||||
bool success_c1;
|
||||
std::vector<std::string> data_c1;
|
||||
std::string pubkey_c1;
|
||||
OxenMQ client1(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
|
||||
client1.add_category("notify", Access{AuthLevel::none});
|
||||
client1.add_command("notify", "greetings", [&](Message& m) {
|
||||
const auto& data = m.data;
|
||||
if (!data.size())
|
||||
{
|
||||
std::cerr << "client received public.greetings with empty data\n";
|
||||
return;
|
||||
}
|
||||
if (data[0] == "hello")
|
||||
reply_count_c1++;
|
||||
});
|
||||
|
||||
client1.start();
|
||||
|
||||
auto c1 = client1.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey_c1 = conn.pubkey(); connected_c1 = true; },
|
||||
[&](auto, auto) { failed_c1 = true; });
|
||||
|
||||
wait_for([&] { return connected_c1 || failed_c1; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( connected_c1 );
|
||||
REQUIRE_FALSE( failed_c1 );
|
||||
REQUIRE( oxenc::to_hex(pubkey_c1) == oxenc::to_hex(server.get_pubkey()) );
|
||||
}
|
||||
|
||||
client1.request(c1, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply_c1 = true;
|
||||
success_c1 = ok;
|
||||
data_c1 = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply_c1.load() );
|
||||
REQUIRE( success_c1 );
|
||||
REQUIRE( data_c1 == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
/* end client 1 */
|
||||
|
||||
/* client 2 */
|
||||
std::atomic<int> reply_count_c2{0};
|
||||
std::atomic<bool> connected_c2{false}, failed_c2{false};
|
||||
std::atomic<bool> got_reply_c2{false};
|
||||
bool success_c2;
|
||||
std::vector<std::string> data_c2;
|
||||
std::string pubkey_c2;
|
||||
OxenMQ client2(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
|
||||
client2.add_category("notify", Access{AuthLevel::none});
|
||||
client2.add_command("notify", "greetings", [&](Message& m) {
|
||||
const auto& data = m.data;
|
||||
if (!data.size())
|
||||
{
|
||||
std::cerr << "client received public.greetings with empty data\n";
|
||||
return;
|
||||
}
|
||||
if (data[0] == "hello")
|
||||
reply_count_c2++;
|
||||
});
|
||||
|
||||
client2.start();
|
||||
|
||||
auto c2 = client2.connect_remote(address{listen, server.get_pubkey()},
|
||||
[&](auto conn) { pubkey_c2 = conn.pubkey(); connected_c2 = true; },
|
||||
[&](auto, auto) { failed_c2 = true; });
|
||||
|
||||
wait_for([&] { return connected_c2 || failed_c2; });
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( connected_c2 );
|
||||
REQUIRE_FALSE( failed_c2 );
|
||||
REQUIRE( oxenc::to_hex(pubkey_c2) == oxenc::to_hex(server.get_pubkey()) );
|
||||
}
|
||||
|
||||
client2.request(c2, "public.greetings", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_reply_c2 = true;
|
||||
success_c2 = ok;
|
||||
data_c2 = std::move(data_);
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( got_reply_c2.load() );
|
||||
REQUIRE( success_c2 );
|
||||
REQUIRE( data_c2 == std::vector<std::string>{{"OK"}} );
|
||||
}
|
||||
/* end client2 */
|
||||
|
||||
greetings.publish([&](auto& conn) {
|
||||
server.send(conn, "notify.greetings", "hello");
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count_c1 == 1 );
|
||||
REQUIRE( reply_count_c2 == 1 );
|
||||
}
|
||||
|
||||
greetings.publish([&](auto& conn) {
|
||||
server.send(conn, "notify.greetings", "hello");
|
||||
});
|
||||
|
||||
reply_sleep();
|
||||
{
|
||||
auto lock = catch_lock();
|
||||
REQUIRE( reply_count_c1 == 2 );
|
||||
REQUIRE( reply_count_c2 == 2 );
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// vim:sw=4:et
|
Loading…
Reference in New Issue