From 8b6f6f498c1895cc7b3b611a19deef7bbb50e5d7 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Mon, 23 Mar 2020 22:30:53 -0300 Subject: [PATCH] Make request timeout configurable For example: lmq.request(conn, "some.method", callback, lokimq::request_timeout{5s}); will result in the callback being called with a failure if the response doesn't arrive within 5s. (If it still arrives, but after the failure callback, it gets dropped). --- lokimq/lokimq.h | 19 +++++++++-- lokimq/proxy.cpp | 5 ++- tests/test_requests.cpp | 70 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index b44c781..a215755 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -74,9 +74,9 @@ static constexpr auto DEFAULT_SEND_KEEP_ALIVE = 30s; // The default timeout for connect_remote() static constexpr auto REMOTE_CONNECT_TIMEOUT = 10s; -// The minimum amount of time we wait for a reply to a REQUEST before calling the callback with +// The amount of time we wait for a reply to a REQUEST before calling the callback with // `false` to signal a timeout. -static constexpr auto REQUEST_TIMEOUT = 15s; +static constexpr auto DEFAULT_REQUEST_TIMEOUT = 15s; /// Maximum length of a category static constexpr size_t MAX_CATEGORY_LENGTH = 50; @@ -1050,6 +1050,16 @@ struct keep_alive { explicit keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {} }; +/// Specifies the amount of time to wait before triggering a failure callback for a request. If a +/// request reply arrives *after* the failure timeout has been triggered then it will be dropped. +/// (This has no effect if specified on a non-request() call). Note that requests failures are only +/// processed in the CONN_CHECK_INTERVAL timer, so it can be up to that much longer than the time +/// specified here before a failure callback is invoked. +struct request_timeout { + std::chrono::milliseconds time; + explicit request_timeout(std::chrono::milliseconds time) : time{std::move(time)} {} +}; + } namespace detail { @@ -1090,6 +1100,11 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option control_data["keep_alive"] = timeout.time.count(); } +/// `request_timeout` specialization: set the timeout time for a request +inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::request_timeout& timeout) { + control_data["request_timeout"] = timeout.time.count(); +} + /// Extracts a pubkey, SN status, and auth level from a zmq message received on a *listening* diff --git a/lokimq/proxy.cpp b/lokimq/proxy.cpp index 4183dbd..171ed9e 100644 --- a/lokimq/proxy.cpp +++ b/lokimq/proxy.cpp @@ -33,6 +33,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { // NB: bt_dict_consumer goes in alphabetical order string_view hint; std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE}; + std::chrono::milliseconds request_timeout{DEFAULT_REQUEST_TIMEOUT}; bool optional = false; bool incoming = false; bool request = false; @@ -74,6 +75,8 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { if (!data.skip_until("request_tag")) throw std::runtime_error("Internal error: received request without request_name"); request_tag = data.consume_string(); + if (data.skip_until("request_timeout")) + request_timeout = std::chrono::milliseconds{data.consume_integer()}; } if (!data.skip_until("send")) throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing"); @@ -112,7 +115,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { if (request) { LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag)); pending_requests.insert({ request_tag, { - std::chrono::steady_clock::now() + REQUEST_TIMEOUT, std::move(*request_cbptr) }}); + std::chrono::steady_clock::now() + request_timeout, std::move(*request_cbptr) }}); } try { diff --git a/tests/test_requests.cpp b/tests/test_requests.cpp index 6d274fe..53a4c68 100644 --- a/tests/test_requests.cpp +++ b/tests/test_requests.cpp @@ -129,3 +129,73 @@ TEST_CASE("request from server to client", "[requests]") { REQUIRE( data == std::vector{{"123"}} ); } } + +TEST_CASE("request timeouts", "[requests][timeout]") { + std::string listen = "tcp://127.0.0.1:5678"; + LokiMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + std::atomic hellos{0}, his{0}; + + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "blackhole", [&](Message& m) { /* doesn't reply */ }); + server.start(); + + LokiMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + //client.log_level(LogLevel::trace); + + client.CONN_CHECK_INTERVAL = 10ms; // impatience (don't set this low in production code) + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(listen, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }, + server.get_pubkey()); + + int i; + for (i = 0; i < 5; i++) { + if (connected.load()) + break; + std::this_thread::sleep_for(50ms); + } + REQUIRE( connected.load() ); + REQUIRE( !failed.load() ); + REQUIRE( i <= 1 ); + REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) ); + + std::atomic got_triggered{false}; + bool success; + std::vector data; + client.request(c, "public.blackhole", [&](bool ok, std::vector data_) { + got_triggered = true; + success = ok; + data = std::move(data_); + }, + lokimq::send_option::request_timeout{30ms} + ); + + std::atomic got_triggered2{false}; + client.request(c, "public.blackhole", [&](bool ok, std::vector data_) { + got_triggered = true; + success = ok; + data = std::move(data_); + }, + lokimq::send_option::request_timeout{100ms} + ); + + std::this_thread::sleep_for(50ms); + REQUIRE( got_triggered.load() ); + REQUIRE_FALSE( success ); + REQUIRE( data.size() == 0 ); + + REQUIRE_FALSE( got_triggered2.load() ); +}