From f4fad9c194017495fc5a162f6ea822e008034caf Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 10 Mar 2020 00:25:13 -0300 Subject: [PATCH] Fix problems on outgoing disconnect This removes two superfluous erases that occur during connection closing (the proxy_close_connection just above them already removes the element from `peers`), and also short-circuits the incoming message loop if our pollitems becomes stale so that we don't try to use a closed connection. It also fixes a bug in the outgoing connection index that was decrementing the wrong connection indices, leading to failures when trying to send on an existing connection after a disconnect. Also adds a test case (which fails before the changes in this commit) to test this. --- lokimq/lokimq.cpp | 15 ++++++++++++--- tests/test_connect.cpp | 43 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index da3ae8a..937287c 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -924,12 +924,15 @@ void update_connection_indices(Container& c, size_t index, AccessIndex get_index it = c.erase(it); continue; } - if (index > i) + if (i > index) --i; ++it; } } +/// Closes outgoing connections and removes all references. Note that this will invalidate +/// iterators on the various connection containers - if you don't want that, delete it first so that +/// the container won't contain the element being deleted. void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) { connections[index].setsockopt(ZMQ_LINGER, linger > 0ms ? linger.count() : 0); pollitems_stale = true; @@ -959,7 +962,6 @@ void LokiMQ::proxy_expire_idle_peers() { } LMQ_LOG(info, "Closing outgoing connection to ", it->first, ": idle timeout reached"); proxy_close_connection(info.conn_index, CLOSE_LINGER); - it = peers.erase(it); } else { ++it; } @@ -1186,6 +1188,14 @@ void LokiMQ::proxy_loop() { if (!proxy_handle_builtin(i, parts)) proxy_to_worker(i, parts); + + if (pollitems_stale) { + // If our items became stale then we may have just closed a connection and so our + // queue index maybe also be stale, so restart the proxy loop (so that we rebuild + // pollitems). + LMQ_TRACE("pollitems became stale; short-circuiting incoming message loop"); + break; + } } LMQ_TRACE("done proxy loop"); @@ -1902,7 +1912,6 @@ void LokiMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linge if (peer.outgoing()) { LMQ_LOG(info, "Closing outgoing connection to ", conn); proxy_close_connection(peer.conn_index, linger); - peers.erase(it); return; } } diff --git a/tests/test_connect.cpp b/tests/test_connect.cpp index 4f9e2aa..5ff9cfb 100644 --- a/tests/test_connect.cpp +++ b/tests/test_connect.cpp @@ -1,5 +1,6 @@ #include "common.h" #include +#include extern "C" { #include } @@ -121,3 +122,45 @@ TEST_CASE("plain-text connections", "[plaintext][connect]") { } + +TEST_CASE("SN disconnections", "[connect][disconnect]") { + std::vector> lmq; + std::vector pubkey, privkey; + std::unordered_map conn; + for (int i = 0; i < 3; i++) { + pubkey.emplace_back(); + privkey.emplace_back(); + pubkey[i].resize(crypto_box_PUBLICKEYBYTES); + privkey[i].resize(crypto_box_SECRETKEYBYTES); + crypto_box_keypair(reinterpret_cast(&pubkey[i][0]), reinterpret_cast(&privkey[i][0])); + conn.emplace(pubkey[i], "tcp://127.0.0.1:" + std::to_string(4450 + i)); + } + std::atomic his{0}; + for (int i = 0; i < pubkey.size(); i++) { + lmq.push_back(std::make_unique( + pubkey[i], privkey[i], true, + [conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; }, + get_logger("S" + std::to_string(i) + "ยป ") + )); + auto& server = *lmq.back(); + server.log_level(LogLevel::debug); + + server.listen_curve(conn[pubkey[i]], [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, true}; }); + server.add_category("sn", Access{AuthLevel::none, true}) + .add_command("hi", [&](Message& m) { his++; }); + server.start(); + } + std::this_thread::sleep_for(50ms); + + lmq[0]->send(pubkey[1], "sn.hi"); + lmq[0]->send(pubkey[2], "sn.hi"); + std::this_thread::sleep_for(50ms); + lmq[2]->send(pubkey[0], "sn.hi"); + lmq[2]->send(pubkey[1], "sn.hi"); + lmq[1]->send(pubkey[0], "BYE"); + std::this_thread::sleep_for(50ms); + lmq[0]->send(pubkey[2], "sn.hi"); + std::this_thread::sleep_for(50ms); + + REQUIRE(his == 5); +}