Merge branch 'dev' into stable

This commit is contained in:
Jason Rhinelander 2021-03-09 15:49:50 -04:00
commit d570093f23
5 changed files with 130 additions and 6 deletions

View File

@ -22,7 +22,7 @@ include(GNUInstallDirs)
set(OXENMQ_VERSION_MAJOR 1)
set(OXENMQ_VERSION_MINOR 2)
set(OXENMQ_VERSION_PATCH 3)
set(OXENMQ_VERSION_PATCH 4)
set(OXENMQ_VERSION "${OXENMQ_VERSION_MAJOR}.${OXENMQ_VERSION_MINOR}.${OXENMQ_VERSION_PATCH}")
message(STATUS "oxenmq v${OXENMQ_VERSION}")

View File

@ -373,11 +373,11 @@ template <typename S, typename T>
struct bt_deserialize<std::pair<S, T>> : bt_deserialize_tuple<std::pair, S, T> {};
template <typename T>
constexpr bool is_bt_tuple = false;
inline constexpr bool is_bt_tuple = false;
template <typename... T>
constexpr bool is_bt_tuple<std::tuple<T...>> = true;
inline constexpr bool is_bt_tuple<std::tuple<T...>> = true;
template <typename S, typename T>
constexpr bool is_bt_tuple<std::pair<S, T>> = true;
inline constexpr bool is_bt_tuple<std::pair<S, T>> = true;
template <typename T>

View File

@ -36,7 +36,7 @@ public:
/// If you want to send a non-strong reply even when the remote is a service node then add
/// an explicit `send_option::optional()` argument.
template <typename... Args>
void send_back(std::string_view, Args&&... args);
void send_back(std::string_view command, Args&&... args);
/// Sends a reply to a request. This takes no command: the command is always the built-in
/// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other
@ -51,7 +51,47 @@ public:
/// Sends a request back to whomever sent this message. This is effectively a wrapper around
/// lmq.request() that takes care of setting up the recipient arguments.
template <typename ReplyCallback, typename... Args>
void send_request(std::string_view cmd, ReplyCallback&& callback, Args&&... args);
void send_request(std::string_view command, ReplyCallback&& callback, Args&&... args);
/** Class returned by `send_later()` that can be used to call `back()`, `reply()`, or
* `request()` beyond the lifetime of the Message instance as if calling `msg.send_back()`,
* `msg.send_reply()`, or `msg.send_request()`. For example:
*
* auto send = msg.send_later();
* // ... later, perhaps in a lambda or scheduled job:
* send.reply("content");
*
* is equivalent to
*
* msg.send_reply("content");
*
* except that it is valid even if `msg` is no longer valid.
*/
class DeferredSend {
public:
OxenMQ& oxenmq; ///< The owning OxenMQ object
ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `reply()`.
explicit DeferredSend(Message& m) : oxenmq{m.oxenmq}, conn{m.conn}, reply_tag{m.reply_tag} {}
/// Equivalent to msg.send_back(...), but can be invoked later.
template <typename... Args>
void back(std::string_view command, Args&&... args) const;
/// Equivalent to msg.send_reply(...), but can be invoked later.
template <typename... Args>
void reply(Args&&... args) const;
/// Equivalent to msg.send_request(...), but can be invoked later.
template <typename ReplyCallback, typename... Args>
void request(std::string_view command, ReplyCallback&& callback, Args&&... args) const;
};
/// Returns a DeferredSend object that can be used to send replies to this message even if the
/// message expires. Typically this is used when sending a reply requires waiting on another
/// task to complete without needing to block the handler thread.
DeferredSend send_later() { return DeferredSend{*this}; }
};
}

View File

@ -1489,18 +1489,32 @@ template <typename... Args>
void Message::send_back(std::string_view command, Args&&... args) {
oxenmq.send(conn, command, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::DeferredSend::back(std::string_view command, Args&&... args) const {
oxenmq.send(conn, command, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::send_reply(Args&&... args) {
assert(!reply_tag.empty());
oxenmq.send(conn, "REPLY", reply_tag, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::DeferredSend::reply(Args&&... args) const {
assert(!reply_tag.empty());
oxenmq.send(conn, "REPLY", reply_tag, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename Callback, typename... Args>
void Message::send_request(std::string_view cmd, Callback&& callback, Args&&... args) {
oxenmq.request(conn, cmd, std::forward<Callback>(callback),
send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename Callback, typename... Args>
void Message::DeferredSend::request(std::string_view cmd, Callback&& callback, Args&&... args) const {
oxenmq.request(conn, cmd, std::forward<Callback>(callback),
send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
// When log messages are invoked we strip out anything before this in the filename:
constexpr std::string_view LOG_PREFIX{"oxenmq/", 7};

View File

@ -437,3 +437,73 @@ TEST_CASE("data parts", "[send][data_parts]") {
REQUIRE( r == expected );
}
}
TEST_CASE("deferred replies", "[send][deferred]") {
std::string listen = random_localhost();
OxenMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
server.listen_curve(listen);
std::atomic<int> hellos{0}, his{0};
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "echo", [&](Message& m) {
std::string msg = m.data.empty() ? ""s : std::string{m.data.front()};
std::thread t{[send=m.send_later(), msg=std::move(msg)] {
{ auto lock = catch_lock(); INFO("sleeping"); }
std::this_thread::sleep_for(50ms);
{ auto lock = catch_lock(); INFO("sending"); }
send.reply(msg);
}};
t.detach();
});
server.set_general_threads(1);
server.start();
OxenMQ client(
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
);
//client.log_level(LogLevel::trace);
client.start();
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
auto c = client.connect_remote(address{listen, server.get_pubkey()},
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
[&](auto, auto) { failed = true; });
wait_for([&] { return connected || failed; });
{
auto lock = catch_lock();
REQUIRE( connected );
REQUIRE_FALSE( failed );
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
}
std::unordered_set<std::string> replies;
std::mutex reply_mut;
std::vector<std::string> data;
for (auto str : {"hello", "world", "omg"})
client.request(c, "public.echo", [&](bool ok, std::vector<std::string> data_) {
std::lock_guard lock{reply_mut};
replies.insert(std::move(data_[0]));
}, str);
reply_sleep();
{
std::lock_guard lq{reply_mut};
auto lock = catch_lock();
REQUIRE( replies.size() == 0 ); // The server waits 50ms before sending, so we shouldn't have any reply yet
}
std::this_thread::sleep_for(60ms); // We're at least 70ms in now so the 50ms-delayed server responses should have arrived
{
std::lock_guard lq{reply_mut};
auto lock = catch_lock();
REQUIRE( replies == std::unordered_set<std::string>{{"hello", "world", "omg"}} );
}
}