mirror of https://github.com/oxen-io/oxen-mq.git
Make request timeout configurable
For example: lmq.request(conn, "some.method", callback, lokimq::request_timeout{5s}); will result in the callback being called with a failure if the response doesn't arrive within 5s. (If it still arrives, but after the failure callback, it gets dropped).
This commit is contained in:
parent
75750001ce
commit
8b6f6f498c
|
@ -74,9 +74,9 @@ static constexpr auto DEFAULT_SEND_KEEP_ALIVE = 30s;
|
|||
// The default timeout for connect_remote()
|
||||
static constexpr auto REMOTE_CONNECT_TIMEOUT = 10s;
|
||||
|
||||
// The minimum amount of time we wait for a reply to a REQUEST before calling the callback with
|
||||
// The amount of time we wait for a reply to a REQUEST before calling the callback with
|
||||
// `false` to signal a timeout.
|
||||
static constexpr auto REQUEST_TIMEOUT = 15s;
|
||||
static constexpr auto DEFAULT_REQUEST_TIMEOUT = 15s;
|
||||
|
||||
/// Maximum length of a category
|
||||
static constexpr size_t MAX_CATEGORY_LENGTH = 50;
|
||||
|
@ -1050,6 +1050,16 @@ struct keep_alive {
|
|||
explicit keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {}
|
||||
};
|
||||
|
||||
/// Specifies the amount of time to wait before triggering a failure callback for a request. If a
|
||||
/// request reply arrives *after* the failure timeout has been triggered then it will be dropped.
|
||||
/// (This has no effect if specified on a non-request() call). Note that requests failures are only
|
||||
/// processed in the CONN_CHECK_INTERVAL timer, so it can be up to that much longer than the time
|
||||
/// specified here before a failure callback is invoked.
|
||||
struct request_timeout {
|
||||
std::chrono::milliseconds time;
|
||||
explicit request_timeout(std::chrono::milliseconds time) : time{std::move(time)} {}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace detail {
|
||||
|
@ -1090,6 +1100,11 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option
|
|||
control_data["keep_alive"] = timeout.time.count();
|
||||
}
|
||||
|
||||
/// `request_timeout` specialization: set the timeout time for a request
|
||||
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::request_timeout& timeout) {
|
||||
control_data["request_timeout"] = timeout.time.count();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// Extracts a pubkey, SN status, and auth level from a zmq message received on a *listening*
|
||||
|
|
|
@ -33,6 +33,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
|
|||
// NB: bt_dict_consumer goes in alphabetical order
|
||||
string_view hint;
|
||||
std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE};
|
||||
std::chrono::milliseconds request_timeout{DEFAULT_REQUEST_TIMEOUT};
|
||||
bool optional = false;
|
||||
bool incoming = false;
|
||||
bool request = false;
|
||||
|
@ -74,6 +75,8 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
|
|||
if (!data.skip_until("request_tag"))
|
||||
throw std::runtime_error("Internal error: received request without request_name");
|
||||
request_tag = data.consume_string();
|
||||
if (data.skip_until("request_timeout"))
|
||||
request_timeout = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
|
||||
}
|
||||
if (!data.skip_until("send"))
|
||||
throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing");
|
||||
|
@ -112,7 +115,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
|
|||
if (request) {
|
||||
LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag));
|
||||
pending_requests.insert({ request_tag, {
|
||||
std::chrono::steady_clock::now() + REQUEST_TIMEOUT, std::move(*request_cbptr) }});
|
||||
std::chrono::steady_clock::now() + request_timeout, std::move(*request_cbptr) }});
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -129,3 +129,73 @@ TEST_CASE("request from server to client", "[requests]") {
|
|||
REQUIRE( data == std::vector<std::string>{{"123"}} );
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("request timeouts", "[requests][timeout]") {
|
||||
std::string listen = "tcp://127.0.0.1:5678";
|
||||
LokiMQ server{
|
||||
"", "", // generate ephemeral keys
|
||||
false, // not a service node
|
||||
[](auto) { return ""; },
|
||||
};
|
||||
server.listen_curve(listen);
|
||||
|
||||
std::atomic<int> hellos{0}, his{0};
|
||||
|
||||
server.add_category("public", Access{AuthLevel::none});
|
||||
server.add_request_command("public", "blackhole", [&](Message& m) { /* doesn't reply */ });
|
||||
server.start();
|
||||
|
||||
LokiMQ client(
|
||||
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
|
||||
);
|
||||
//client.log_level(LogLevel::trace);
|
||||
|
||||
client.CONN_CHECK_INTERVAL = 10ms; // impatience (don't set this low in production code)
|
||||
client.start();
|
||||
|
||||
std::atomic<bool> connected{false}, failed{false};
|
||||
std::string pubkey;
|
||||
|
||||
auto c = client.connect_remote(listen,
|
||||
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
|
||||
[&](auto, auto) { failed = true; },
|
||||
server.get_pubkey());
|
||||
|
||||
int i;
|
||||
for (i = 0; i < 5; i++) {
|
||||
if (connected.load())
|
||||
break;
|
||||
std::this_thread::sleep_for(50ms);
|
||||
}
|
||||
REQUIRE( connected.load() );
|
||||
REQUIRE( !failed.load() );
|
||||
REQUIRE( i <= 1 );
|
||||
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
|
||||
|
||||
std::atomic<bool> got_triggered{false};
|
||||
bool success;
|
||||
std::vector<std::string> data;
|
||||
client.request(c, "public.blackhole", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_triggered = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
},
|
||||
lokimq::send_option::request_timeout{30ms}
|
||||
);
|
||||
|
||||
std::atomic<bool> got_triggered2{false};
|
||||
client.request(c, "public.blackhole", [&](bool ok, std::vector<std::string> data_) {
|
||||
got_triggered = true;
|
||||
success = ok;
|
||||
data = std::move(data_);
|
||||
},
|
||||
lokimq::send_option::request_timeout{100ms}
|
||||
);
|
||||
|
||||
std::this_thread::sleep_for(50ms);
|
||||
REQUIRE( got_triggered.load() );
|
||||
REQUIRE_FALSE( success );
|
||||
REQUIRE( data.size() == 0 );
|
||||
|
||||
REQUIRE_FALSE( got_triggered2.load() );
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue