mirror of https://github.com/oxen-io/oxen-mq.git
Fix & add tests for send_option::data_parts(...)
data_parts() wasn't currently used anywhere, and was broken: it was calling bt_deserialize which was just wrong. This repurposes it to take iterators over strings (or string-like types) and append those parts as message parts. Also adds tests for it.
This commit is contained in:
parent
af189a8d72
commit
e5cf174b83
|
@ -32,6 +32,7 @@
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <type_traits>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
@ -1263,9 +1264,10 @@ struct data_parts_impl {
|
||||||
data_parts_impl(InputIt begin, InputIt end) : begin{std::move(begin)}, end{std::move(end)} {}
|
data_parts_impl(InputIt begin, InputIt end) : begin{std::move(begin)}, end{std::move(end)} {}
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Specifies an iterator pair of data options to send, for when the number of arguments to send()
|
/// Specifies an iterator pair of data parts to send, for when the number of arguments to send()
|
||||||
/// cannot be determined at compile time.
|
/// cannot be determined at compile time. The iterator pair must be over strings or string_view (or
|
||||||
template <typename InputIt>
|
/// something convertible to a string_view).
|
||||||
|
template <typename InputIt, typename = std::enable_if_t<std::is_convertible_v<decltype(*std::declval<InputIt>()), std::string_view>>>
|
||||||
data_parts_impl<InputIt> data_parts(InputIt begin, InputIt end) { return {std::move(begin), std::move(end)}; }
|
data_parts_impl<InputIt> data_parts(InputIt begin, InputIt end) { return {std::move(begin), std::move(end)}; }
|
||||||
|
|
||||||
/// Specifies a connection hint when passed in to send(). If there is no current connection to the
|
/// Specifies a connection hint when passed in to send(). If there is no current connection to the
|
||||||
|
@ -1395,7 +1397,7 @@ inline void apply_send_option(bt_list& parts, bt_dict&, std::string_view arg) {
|
||||||
template <typename InputIt>
|
template <typename InputIt>
|
||||||
void apply_send_option(bt_list& parts, bt_dict&, const send_option::data_parts_impl<InputIt> data) {
|
void apply_send_option(bt_list& parts, bt_dict&, const send_option::data_parts_impl<InputIt> data) {
|
||||||
for (auto it = data.begin; it != data.end; ++it)
|
for (auto it = data.begin; it != data.end; ++it)
|
||||||
parts.push_back(lokimq::bt_deserialize(*it));
|
parts.emplace_back(*it);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `hint` specialization: sets the hint in the control data
|
/// `hint` specialization: sets the hint in the control data
|
||||||
|
|
|
@ -362,3 +362,78 @@ TEST_CASE("send failure callbacks", "[commands][queue_full]") {
|
||||||
REQUIRE( send_failures.load() > 0 );
|
REQUIRE( send_failures.load() > 0 );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("data parts", "[send][data_parts]") {
|
||||||
|
std::string listen = "tcp://127.0.0.1:4567";
|
||||||
|
LokiMQ server{
|
||||||
|
"", "", // generate ephemeral keys
|
||||||
|
false, // not a service node
|
||||||
|
[](auto) { return ""; },
|
||||||
|
get_logger("S» "),
|
||||||
|
LogLevel::trace
|
||||||
|
};
|
||||||
|
server.listen_curve(listen);
|
||||||
|
|
||||||
|
std::mutex mut;
|
||||||
|
std::vector<std::string> r;
|
||||||
|
|
||||||
|
server.add_category("public", Access{AuthLevel::none});
|
||||||
|
server.add_command("public", "hello", [&](Message& m) {
|
||||||
|
std::lock_guard l{mut};
|
||||||
|
for (const auto& s : m.data)
|
||||||
|
r.emplace_back(s);
|
||||||
|
});
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
LokiMQ client{get_logger("C» "), LogLevel::trace};
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
std::atomic<bool> got{false};
|
||||||
|
bool success = false, failed = false;
|
||||||
|
std::string pubkey;
|
||||||
|
|
||||||
|
auto c = client.connect_remote(address{listen, server.get_pubkey()},
|
||||||
|
[&](auto conn) { pubkey = conn.pubkey(); success = true; got = true; },
|
||||||
|
[&](auto conn, std::string_view) { failed = true; got = true; });
|
||||||
|
|
||||||
|
wait_for_conn(got);
|
||||||
|
{
|
||||||
|
auto lock = catch_lock();
|
||||||
|
REQUIRE( got );
|
||||||
|
REQUIRE( success );
|
||||||
|
REQUIRE_FALSE( failed );
|
||||||
|
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector some_data{{"abc"s, "def"s, "omg123\0zzz"s}};
|
||||||
|
client.send(c, "public.hello", lokimq::send_option::data_parts(some_data.begin(), some_data.end()));
|
||||||
|
reply_sleep();
|
||||||
|
{
|
||||||
|
auto lock = catch_lock();
|
||||||
|
std::lock_guard l{mut};
|
||||||
|
REQUIRE( r == some_data );
|
||||||
|
r.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector some_data2{{"a"sv, "b"sv, "\0"sv}};
|
||||||
|
client.send(c, "public.hello",
|
||||||
|
"hi",
|
||||||
|
lokimq::send_option::data_parts(some_data2.begin(), some_data2.end()),
|
||||||
|
"another",
|
||||||
|
"string"sv,
|
||||||
|
lokimq::send_option::data_parts(some_data.begin(), some_data.end()));
|
||||||
|
|
||||||
|
std::vector<std::string> expected;
|
||||||
|
expected.push_back("hi");
|
||||||
|
expected.insert(expected.end(), some_data2.begin(), some_data2.end());
|
||||||
|
expected.push_back("another");
|
||||||
|
expected.push_back("string");
|
||||||
|
expected.insert(expected.end(), some_data.begin(), some_data.end());
|
||||||
|
|
||||||
|
reply_sleep();
|
||||||
|
{
|
||||||
|
auto lock = catch_lock();
|
||||||
|
std::lock_guard l{mut};
|
||||||
|
REQUIRE( r == expected );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue