From 506bd65b05b1cd5b11b4d1e3392306e4473f5ba3 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 15 Jan 2021 12:20:58 -0400 Subject: [PATCH] Add better deferred reply capabilities to Message This provides an interface for sending a reply to a message later (i.e. after the Message& itself is no longer valid) by using a new `send_later()` method of the Message instance that returns an object that can properly route replies (and can outlive the Message it was called on). Intended use is: run_this_lambda_later([send=msg.send_later()] { send.reply("content"); }); which is equivalent to: run_this_lambda_later([&msg] { msg.send_reply("content"); }); except that it works properly even if the lambda is invoked beyond the lifetime of `msg`. --- oxenmq/message.h | 44 ++++++++++++++++++++++++-- oxenmq/oxenmq.h | 14 +++++++++ tests/test_commands.cpp | 70 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 2 deletions(-) diff --git a/oxenmq/message.h b/oxenmq/message.h index d914561..3eb1afb 100644 --- a/oxenmq/message.h +++ b/oxenmq/message.h @@ -36,7 +36,7 @@ public: /// If you want to send a non-strong reply even when the remote is a service node then add /// an explicit `send_option::optional()` argument. template - void send_back(std::string_view, Args&&... args); + void send_back(std::string_view command, Args&&... args); /// Sends a reply to a request. This takes no command: the command is always the built-in /// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other @@ -51,7 +51,47 @@ public: /// Sends a request back to whomever sent this message. This is effectively a wrapper around /// lmq.request() that takes care of setting up the recipient arguments. template - void send_request(std::string_view cmd, ReplyCallback&& callback, Args&&... args); + void send_request(std::string_view command, ReplyCallback&& callback, Args&&... args); + + /** Class returned by `send_later()` that can be used to call `back()`, `reply()`, or + * `request()` beyond the lifetime of the Message instance as if calling `msg.send_back()`, + * `msg.send_reply()`, or `msg.send_request()`. For example: + * + * auto send = msg.send_later(); + * // ... later, perhaps in a lambda or scheduled job: + * send.reply("content"); + * + * is equivalent to + * + * msg.send_reply("content"); + * + * except that it is valid even if `msg` is no longer valid. + */ + class DeferredSend { + public: + OxenMQ& oxenmq; ///< The owning OxenMQ object + ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status + std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `reply()`. + + explicit DeferredSend(Message& m) : oxenmq{m.oxenmq}, conn{m.conn}, reply_tag{m.reply_tag} {} + + /// Equivalent to msg.send_back(...), but can be invoked later. + template + void back(std::string_view command, Args&&... args) const; + + /// Equivalent to msg.send_reply(...), but can be invoked later. + template + void reply(Args&&... args) const; + + /// Equivalent to msg.send_request(...), but can be invoked later. + template + void request(std::string_view command, ReplyCallback&& callback, Args&&... args) const; + }; + + /// Returns a DeferredSend object that can be used to send replies to this message even if the + /// message expires. Typically this is used when sending a reply requires waiting on another + /// task to complete without needing to block the handler thread. + DeferredSend send_later() { return DeferredSend{*this}; } }; } diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 84cf037..bd03d1c 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -1489,18 +1489,32 @@ template void Message::send_back(std::string_view command, Args&&... args) { oxenmq.send(conn, command, send_option::optional{!conn.sn()}, std::forward(args)...); } +template +void Message::DeferredSend::back(std::string_view command, Args&&... args) const { + oxenmq.send(conn, command, send_option::optional{!conn.sn()}, std::forward(args)...); +} template void Message::send_reply(Args&&... args) { assert(!reply_tag.empty()); oxenmq.send(conn, "REPLY", reply_tag, send_option::optional{!conn.sn()}, std::forward(args)...); } +template +void Message::DeferredSend::reply(Args&&... args) const { + assert(!reply_tag.empty()); + oxenmq.send(conn, "REPLY", reply_tag, send_option::optional{!conn.sn()}, std::forward(args)...); +} template void Message::send_request(std::string_view cmd, Callback&& callback, Args&&... args) { oxenmq.request(conn, cmd, std::forward(callback), send_option::optional{!conn.sn()}, std::forward(args)...); } +template +void Message::DeferredSend::request(std::string_view cmd, Callback&& callback, Args&&... args) const { + oxenmq.request(conn, cmd, std::forward(callback), + send_option::optional{!conn.sn()}, std::forward(args)...); +} // When log messages are invoked we strip out anything before this in the filename: constexpr std::string_view LOG_PREFIX{"oxenmq/", 7}; diff --git a/tests/test_commands.cpp b/tests/test_commands.cpp index fe5c598..b05c39d 100644 --- a/tests/test_commands.cpp +++ b/tests/test_commands.cpp @@ -437,3 +437,73 @@ TEST_CASE("data parts", "[send][data_parts]") { REQUIRE( r == expected ); } } + +TEST_CASE("deferred replies", "[send][deferred]") { + std::string listen = random_localhost(); + OxenMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + std::atomic hellos{0}, his{0}; + + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "echo", [&](Message& m) { + std::string msg = m.data.empty() ? ""s : std::string{m.data.front()}; + std::thread t{[send=m.send_later(), msg=std::move(msg)] { + { auto lock = catch_lock(); INFO("sleeping"); } + std::this_thread::sleep_for(50ms); + { auto lock = catch_lock(); INFO("sending"); } + send.reply(msg); + }}; + t.detach(); + }); + server.set_general_threads(1); + server.start(); + + OxenMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + //client.log_level(LogLevel::trace); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(address{listen, server.get_pubkey()}, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }); + + wait_for([&] { return connected || failed; }); + { + auto lock = catch_lock(); + REQUIRE( connected ); + REQUIRE_FALSE( failed ); + REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) ); + } + + std::unordered_set replies; + std::mutex reply_mut; + std::vector data; + for (auto str : {"hello", "world", "omg"}) + client.request(c, "public.echo", [&](bool ok, std::vector data_) { + std::lock_guard lock{reply_mut}; + replies.insert(std::move(data_[0])); + }, str); + + reply_sleep(); + { + std::lock_guard lq{reply_mut}; + auto lock = catch_lock(); + REQUIRE( replies.size() == 0 ); // The server waits 50ms before sending, so we shouldn't have any reply yet + } + std::this_thread::sleep_for(60ms); // We're at least 70ms in now so the 50ms-delayed server responses should have arrived + { + std::lock_guard lq{reply_mut}; + auto lock = catch_lock(); + REQUIRE( replies == std::unordered_set{{"hello", "world", "omg"}} ); + } +}