mirror of https://github.com/oxen-io/oxen-mq.git
Fix problems on outgoing disconnect
This removes two superfluous erases that occur during connection closing (the proxy_close_connection just above them already removes the element from `peers`), and also short-circuits the incoming message loop if our pollitems becomes stale so that we don't try to use a closed connection. It also fixes a bug in the outgoing connection index that was decrementing the wrong connection indices, leading to failures when trying to send on an existing connection after a disconnect. Also adds a test case (which fails before the changes in this commit) to test this.
This commit is contained in:
parent
512086613a
commit
f4fad9c194
|
@ -924,12 +924,15 @@ void update_connection_indices(Container& c, size_t index, AccessIndex get_index
|
||||||
it = c.erase(it);
|
it = c.erase(it);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (index > i)
|
if (i > index)
|
||||||
--i;
|
--i;
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Closes outgoing connections and removes all references. Note that this will invalidate
|
||||||
|
/// iterators on the various connection containers - if you don't want that, delete it first so that
|
||||||
|
/// the container won't contain the element being deleted.
|
||||||
void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) {
|
void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) {
|
||||||
connections[index].setsockopt<int>(ZMQ_LINGER, linger > 0ms ? linger.count() : 0);
|
connections[index].setsockopt<int>(ZMQ_LINGER, linger > 0ms ? linger.count() : 0);
|
||||||
pollitems_stale = true;
|
pollitems_stale = true;
|
||||||
|
@ -959,7 +962,6 @@ void LokiMQ::proxy_expire_idle_peers() {
|
||||||
}
|
}
|
||||||
LMQ_LOG(info, "Closing outgoing connection to ", it->first, ": idle timeout reached");
|
LMQ_LOG(info, "Closing outgoing connection to ", it->first, ": idle timeout reached");
|
||||||
proxy_close_connection(info.conn_index, CLOSE_LINGER);
|
proxy_close_connection(info.conn_index, CLOSE_LINGER);
|
||||||
it = peers.erase(it);
|
|
||||||
} else {
|
} else {
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
|
@ -1186,6 +1188,14 @@ void LokiMQ::proxy_loop() {
|
||||||
|
|
||||||
if (!proxy_handle_builtin(i, parts))
|
if (!proxy_handle_builtin(i, parts))
|
||||||
proxy_to_worker(i, parts);
|
proxy_to_worker(i, parts);
|
||||||
|
|
||||||
|
if (pollitems_stale) {
|
||||||
|
// If our items became stale then we may have just closed a connection and so our
|
||||||
|
// queue index maybe also be stale, so restart the proxy loop (so that we rebuild
|
||||||
|
// pollitems).
|
||||||
|
LMQ_TRACE("pollitems became stale; short-circuiting incoming message loop");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LMQ_TRACE("done proxy loop");
|
LMQ_TRACE("done proxy loop");
|
||||||
|
@ -1902,7 +1912,6 @@ void LokiMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linge
|
||||||
if (peer.outgoing()) {
|
if (peer.outgoing()) {
|
||||||
LMQ_LOG(info, "Closing outgoing connection to ", conn);
|
LMQ_LOG(info, "Closing outgoing connection to ", conn);
|
||||||
proxy_close_connection(peer.conn_index, linger);
|
proxy_close_connection(peer.conn_index, linger);
|
||||||
peers.erase(it);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include <future>
|
#include <future>
|
||||||
|
#include <lokimq/hex.h>
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include <sodium.h>
|
#include <sodium.h>
|
||||||
}
|
}
|
||||||
|
@ -121,3 +122,45 @@ TEST_CASE("plain-text connections", "[plaintext][connect]") {
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("SN disconnections", "[connect][disconnect]") {
|
||||||
|
std::vector<std::unique_ptr<LokiMQ>> lmq;
|
||||||
|
std::vector<std::string> pubkey, privkey;
|
||||||
|
std::unordered_map<std::string, std::string> conn;
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
pubkey.emplace_back();
|
||||||
|
privkey.emplace_back();
|
||||||
|
pubkey[i].resize(crypto_box_PUBLICKEYBYTES);
|
||||||
|
privkey[i].resize(crypto_box_SECRETKEYBYTES);
|
||||||
|
crypto_box_keypair(reinterpret_cast<unsigned char*>(&pubkey[i][0]), reinterpret_cast<unsigned char*>(&privkey[i][0]));
|
||||||
|
conn.emplace(pubkey[i], "tcp://127.0.0.1:" + std::to_string(4450 + i));
|
||||||
|
}
|
||||||
|
std::atomic<int> his{0};
|
||||||
|
for (int i = 0; i < pubkey.size(); i++) {
|
||||||
|
lmq.push_back(std::make_unique<LokiMQ>(
|
||||||
|
pubkey[i], privkey[i], true,
|
||||||
|
[conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; },
|
||||||
|
get_logger("S" + std::to_string(i) + "» ")
|
||||||
|
));
|
||||||
|
auto& server = *lmq.back();
|
||||||
|
server.log_level(LogLevel::debug);
|
||||||
|
|
||||||
|
server.listen_curve(conn[pubkey[i]], [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, true}; });
|
||||||
|
server.add_category("sn", Access{AuthLevel::none, true})
|
||||||
|
.add_command("hi", [&](Message& m) { his++; });
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(50ms);
|
||||||
|
|
||||||
|
lmq[0]->send(pubkey[1], "sn.hi");
|
||||||
|
lmq[0]->send(pubkey[2], "sn.hi");
|
||||||
|
std::this_thread::sleep_for(50ms);
|
||||||
|
lmq[2]->send(pubkey[0], "sn.hi");
|
||||||
|
lmq[2]->send(pubkey[1], "sn.hi");
|
||||||
|
lmq[1]->send(pubkey[0], "BYE");
|
||||||
|
std::this_thread::sleep_for(50ms);
|
||||||
|
lmq[0]->send(pubkey[2], "sn.hi");
|
||||||
|
std::this_thread::sleep_for(50ms);
|
||||||
|
|
||||||
|
REQUIRE(his == 5);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue