From 4a6bb3f70274e9cd62a40cd6a8316cbf46022dc8 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 1 Jul 2021 00:37:55 -0300 Subject: [PATCH 1/2] Fix messages coming back on an outgoing connection The recent PR that revamped the connection IDs missed a case when connecting to service nodes where we store the SN pubkey in peers, but then fail to find the peer when we look it up by connection id. This adds the required tracking to fix that case (and adds a test that fails without the fix here). --- oxenmq/connections.cpp | 3 +++ oxenmq/oxenmq.h | 5 ++++ oxenmq/proxy.cpp | 4 ++- oxenmq/worker.cpp | 6 ++++- tests/test_connect.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 2 deletions(-) diff --git a/oxenmq/connections.cpp b/oxenmq/connections.cpp index 26aed93..57ec6fa 100644 --- a/oxenmq/connections.cpp +++ b/oxenmq/connections.cpp @@ -174,6 +174,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, p.idle_expiry = keep_alive; p.activity(); connections_updated = true; + outgoing_sn_conns.emplace_hint(outgoing_sn_conns.end(), p.conn_id, ConnectionID{remote}); auto it = connections.emplace_hint(connections.end(), p.conn_id, std::move(socket)); return {&it->second, ""s}; @@ -217,6 +218,8 @@ void OxenMQ::proxy_close_connection(int64_t id, std::chrono::milliseconds linger it->second.set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0); connections.erase(it); connections_updated = true; + + outgoing_sn_conns.erase(id); } void OxenMQ::proxy_expire_idle_peers() { diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index bc3536f..bcd0ce6 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -378,6 +378,11 @@ private: /// SN pubkey string. std::unordered_multimap peers; + /// For outgoing connections to service nodes `peers` contains the service node connection id, + /// but we sometimes need to be able to get the peer info from a numeric connection id (for + /// example, for incoming messages on a connection we made); this map lets us do that. + std::map outgoing_sn_conns; + /// The next ConnectionID value we should use (for outgoing, non-SN connections). std::atomic next_conn_id{1}; diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index db564e1..9252483 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -563,8 +563,10 @@ void OxenMQ::proxy_loop() { continue; } - if (!proxy_handle_builtin(id, sock, parts)) + if (!proxy_handle_builtin(id, sock, parts)) { + LMQ_LOG(warn, "proxying to worker from connection ", id); proxy_to_worker(id, sock, parts); + } if (connections_updated) { // If connections got updated then our points are stale, to restart the proxy loop; diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index acd8e99..ae5046f 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -283,7 +283,11 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectorsecond) + : peers.find(conn_id); + if (it == peers.end()) { LMQ_LOG(warn, "Internal error: connection id ", conn_id, " not found"); return; diff --git a/tests/test_connect.cpp b/tests/test_connect.cpp index 723f184..0b4b846 100644 --- a/tests/test_connect.cpp +++ b/tests/test_connect.cpp @@ -446,3 +446,61 @@ TEST_CASE("SN single worker test", "[connect][worker]") { } } + +TEST_CASE("SN backchatter", "[connect][sn]") { + // When we have a SN connection A -> B and then B sends a message to A on that existing + // connection, A should see it as coming from B. + std::vector> omq; + std::vector pubkey, privkey; + std::unordered_map conn; + REQUIRE(sodium_init() != -1); + for (int i = 0; i < 2; i++) { + pubkey.emplace_back(); + privkey.emplace_back(); + pubkey[i].resize(crypto_box_PUBLICKEYBYTES); + privkey[i].resize(crypto_box_SECRETKEYBYTES); + crypto_box_keypair(reinterpret_cast(&pubkey[i][0]), reinterpret_cast(&privkey[i][0])); + conn.emplace(pubkey[i], random_localhost()); + } + + for (int i = 0; i < pubkey.size(); i++) { + omq.push_back(std::make_unique( + pubkey[i], privkey[i], true, + [conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; }, + get_logger("S" + std::to_string(i) + "ยป "), + LogLevel::trace + )); + auto& server = *omq.back(); + + server.listen_curve(conn[pubkey[i]]); + server.set_active_sns({pubkey.begin(), pubkey.end()}); + } + std::string f; + omq[0]->add_category("a", Access{AuthLevel::none, true}) + .add_command("a", [&](Message& m) { + m.oxenmq.send(m.conn, "b.b", "abc"); + //m.send_back("b.b", "abc"); + }) + .add_command("z", [&](Message& m) { + auto lock = catch_lock(); + f = m.data[0]; + }); + omq[1]->add_category("b", Access{AuthLevel::none, true}) + .add_command("b", [&](Message& m) { + { + auto lock = catch_lock(); + UNSCOPED_INFO("b.b from conn " << m.conn); + } + m.send_back("a.z", m.data[0]); + }); + + for (auto& server : omq) + server->start(); + + auto c = omq[1]->connect_sn(pubkey[0]); + omq[1]->send(c, "a.a"); + std::this_thread::sleep_for(50ms); + + auto lock = catch_lock(); + REQUIRE(f == "abc"); +} From 9e0d2e24f606dc48c9c7ccaeb434bf2267a44808 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 1 Jul 2021 00:39:35 -0300 Subject: [PATCH 2/2] Add a single container version of `send_option::data_parts` `send_option::data_parts(mycontainer)` is now a shortcut for `send_option::data_parts(mycontainer.begin(), mycontainer.end())`. --- oxenmq/oxenmq.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index bcd0ce6..6b49b35 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -1364,6 +1364,10 @@ struct data_parts_impl { template ()), std::string_view>>> data_parts_impl data_parts(InputIt begin, InputIt end) { return {std::move(begin), std::move(end)}; } +/// Shortcut for send_option::data_parts(container.begin(), container.end()) +template +auto data_parts(const Container& c) { return data_parts(c.begin(), c.end()); } + /// Specifies a connection hint when passed in to send(). If there is no current connection to the /// peer then the hint is used to save a call to the SNRemoteAddress to get the connection location. /// (Note that there is no guarantee that the given hint will be used or that a SNRemoteAddress call