Add better deferred reply capabilities to Message

This provides an interface for sending a reply to a message later (i.e.
after the Message& itself is no longer valid) by using a new
`send_later()` method of the Message instance that returns an object
that can properly route replies (and can outlive the Message it was
called on).

Intended use is:

    run_this_lambda_later([send=msg.send_later()] {
        send.reply("content");
    });

which is equivalent to:

    run_this_lambda_later([&msg] {
        msg.send_reply("content");
    });

except that it works properly even if the lambda is invoked beyond the
lifetime of `msg`.
This commit is contained in:
Jason Rhinelander 2021-01-15 12:20:58 -04:00
parent 86247bc5c7
commit 506bd65b05
3 changed files with 126 additions and 2 deletions

View file

@ -36,7 +36,7 @@ public:
/// If you want to send a non-strong reply even when the remote is a service node then add
/// an explicit `send_option::optional()` argument.
template <typename... Args>
void send_back(std::string_view, Args&&... args);
void send_back(std::string_view command, Args&&... args);
/// Sends a reply to a request. This takes no command: the command is always the built-in
/// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other
@ -51,7 +51,47 @@ public:
/// Sends a request back to whomever sent this message. This is effectively a wrapper around
/// lmq.request() that takes care of setting up the recipient arguments.
template <typename ReplyCallback, typename... Args>
void send_request(std::string_view cmd, ReplyCallback&& callback, Args&&... args);
void send_request(std::string_view command, ReplyCallback&& callback, Args&&... args);
/** Class returned by `send_later()` that can be used to call `back()`, `reply()`, or
* `request()` beyond the lifetime of the Message instance as if calling `msg.send_back()`,
* `msg.send_reply()`, or `msg.send_request()`. For example:
*
* auto send = msg.send_later();
* // ... later, perhaps in a lambda or scheduled job:
* send.reply("content");
*
* is equivalent to
*
* msg.send_reply("content");
*
* except that it is valid even if `msg` is no longer valid.
*/
class DeferredSend {
public:
OxenMQ& oxenmq; ///< The owning OxenMQ object
ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `reply()`.
explicit DeferredSend(Message& m) : oxenmq{m.oxenmq}, conn{m.conn}, reply_tag{m.reply_tag} {}
/// Equivalent to msg.send_back(...), but can be invoked later.
template <typename... Args>
void back(std::string_view command, Args&&... args) const;
/// Equivalent to msg.send_reply(...), but can be invoked later.
template <typename... Args>
void reply(Args&&... args) const;
/// Equivalent to msg.send_request(...), but can be invoked later.
template <typename ReplyCallback, typename... Args>
void request(std::string_view command, ReplyCallback&& callback, Args&&... args) const;
};
/// Returns a DeferredSend object that can be used to send replies to this message even if the
/// message expires. Typically this is used when sending a reply requires waiting on another
/// task to complete without needing to block the handler thread.
DeferredSend send_later() { return DeferredSend{*this}; }
};
}

View file

@ -1489,18 +1489,32 @@ template <typename... Args>
void Message::send_back(std::string_view command, Args&&... args) {
oxenmq.send(conn, command, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::DeferredSend::back(std::string_view command, Args&&... args) const {
oxenmq.send(conn, command, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::send_reply(Args&&... args) {
assert(!reply_tag.empty());
oxenmq.send(conn, "REPLY", reply_tag, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::DeferredSend::reply(Args&&... args) const {
assert(!reply_tag.empty());
oxenmq.send(conn, "REPLY", reply_tag, send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename Callback, typename... Args>
void Message::send_request(std::string_view cmd, Callback&& callback, Args&&... args) {
oxenmq.request(conn, cmd, std::forward<Callback>(callback),
send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
template <typename Callback, typename... Args>
void Message::DeferredSend::request(std::string_view cmd, Callback&& callback, Args&&... args) const {
oxenmq.request(conn, cmd, std::forward<Callback>(callback),
send_option::optional{!conn.sn()}, std::forward<Args>(args)...);
}
// When log messages are invoked we strip out anything before this in the filename:
constexpr std::string_view LOG_PREFIX{"oxenmq/", 7};

View file

@ -437,3 +437,73 @@ TEST_CASE("data parts", "[send][data_parts]") {
REQUIRE( r == expected );
}
}
TEST_CASE("deferred replies", "[send][deferred]") {
std::string listen = random_localhost();
OxenMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
server.listen_curve(listen);
std::atomic<int> hellos{0}, his{0};
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "echo", [&](Message& m) {
std::string msg = m.data.empty() ? ""s : std::string{m.data.front()};
std::thread t{[send=m.send_later(), msg=std::move(msg)] {
{ auto lock = catch_lock(); INFO("sleeping"); }
std::this_thread::sleep_for(50ms);
{ auto lock = catch_lock(); INFO("sending"); }
send.reply(msg);
}};
t.detach();
});
server.set_general_threads(1);
server.start();
OxenMQ client(
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
);
//client.log_level(LogLevel::trace);
client.start();
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
auto c = client.connect_remote(address{listen, server.get_pubkey()},
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
[&](auto, auto) { failed = true; });
wait_for([&] { return connected || failed; });
{
auto lock = catch_lock();
REQUIRE( connected );
REQUIRE_FALSE( failed );
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
}
std::unordered_set<std::string> replies;
std::mutex reply_mut;
std::vector<std::string> data;
for (auto str : {"hello", "world", "omg"})
client.request(c, "public.echo", [&](bool ok, std::vector<std::string> data_) {
std::lock_guard lock{reply_mut};
replies.insert(std::move(data_[0]));
}, str);
reply_sleep();
{
std::lock_guard lq{reply_mut};
auto lock = catch_lock();
REQUIRE( replies.size() == 0 ); // The server waits 50ms before sending, so we shouldn't have any reply yet
}
std::this_thread::sleep_for(60ms); // We're at least 70ms in now so the 50ms-delayed server responses should have arrived
{
std::lock_guard lq{reply_mut};
auto lock = catch_lock();
REQUIRE( replies == std::unordered_set<std::string>{{"hello", "world", "omg"}} );
}
}