Merge pull request #42 from jagerman/fix-backmsg-connid

Fix return messages on SN connections
This commit is contained in:
Jason Rhinelander 2021-07-01 01:00:19 -03:00 committed by GitHub
commit 3bb32a81ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 2 deletions

View File

@ -174,6 +174,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
p.idle_expiry = keep_alive;
p.activity();
connections_updated = true;
outgoing_sn_conns.emplace_hint(outgoing_sn_conns.end(), p.conn_id, ConnectionID{remote});
auto it = connections.emplace_hint(connections.end(), p.conn_id, std::move(socket));
return {&it->second, ""s};
@ -217,6 +218,8 @@ void OxenMQ::proxy_close_connection(int64_t id, std::chrono::milliseconds linger
it->second.set(zmq::sockopt::linger, linger > 0ms ? (int) linger.count() : 0);
connections.erase(it);
connections_updated = true;
outgoing_sn_conns.erase(id);
}
void OxenMQ::proxy_expire_idle_peers() {

View File

@ -378,6 +378,11 @@ private:
/// SN pubkey string.
std::unordered_multimap<ConnectionID, peer_info> peers;
/// For outgoing connections to service nodes `peers` contains the service node connection id,
/// but we sometimes need to be able to get the peer info from a numeric connection id (for
/// example, for incoming messages on a connection we made); this map lets us do that.
std::map<int64_t, ConnectionID> outgoing_sn_conns;
/// The next ConnectionID value we should use (for outgoing, non-SN connections).
std::atomic<int64_t> next_conn_id{1};
@ -1359,6 +1364,10 @@ struct data_parts_impl {
template <typename InputIt, typename = std::enable_if_t<std::is_convertible_v<decltype(*std::declval<InputIt>()), std::string_view>>>
data_parts_impl<InputIt> data_parts(InputIt begin, InputIt end) { return {std::move(begin), std::move(end)}; }
/// Shortcut for send_option::data_parts(container.begin(), container.end())
template <typename Container>
auto data_parts(const Container& c) { return data_parts(c.begin(), c.end()); }
/// Specifies a connection hint when passed in to send(). If there is no current connection to the
/// peer then the hint is used to save a call to the SNRemoteAddress to get the connection location.
/// (Note that there is no guarantee that the given hint will be used or that a SNRemoteAddress call

View File

@ -563,8 +563,10 @@ void OxenMQ::proxy_loop() {
continue;
}
if (!proxy_handle_builtin(id, sock, parts))
if (!proxy_handle_builtin(id, sock, parts)) {
LMQ_LOG(warn, "proxying to worker from connection ", id);
proxy_to_worker(id, sock, parts);
}
if (connections_updated) {
// If connections got updated then our points are stale, to restart the proxy loop;

View File

@ -283,7 +283,11 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
if (!outgoing) tmp_peer.route = parts[0].to_string();
peer_info* peer = nullptr;
if (outgoing) {
auto it = peers.find(conn_id);
auto snit = outgoing_sn_conns.find(conn_id);
auto it = snit != outgoing_sn_conns.end()
? peers.find(snit->second)
: peers.find(conn_id);
if (it == peers.end()) {
LMQ_LOG(warn, "Internal error: connection id ", conn_id, " not found");
return;

View File

@ -446,3 +446,61 @@ TEST_CASE("SN single worker test", "[connect][worker]") {
}
}
TEST_CASE("SN backchatter", "[connect][sn]") {
// When we have a SN connection A -> B and then B sends a message to A on that existing
// connection, A should see it as coming from B.
std::vector<std::unique_ptr<OxenMQ>> omq;
std::vector<std::string> pubkey, privkey;
std::unordered_map<std::string, std::string> conn;
REQUIRE(sodium_init() != -1);
for (int i = 0; i < 2; i++) {
pubkey.emplace_back();
privkey.emplace_back();
pubkey[i].resize(crypto_box_PUBLICKEYBYTES);
privkey[i].resize(crypto_box_SECRETKEYBYTES);
crypto_box_keypair(reinterpret_cast<unsigned char*>(&pubkey[i][0]), reinterpret_cast<unsigned char*>(&privkey[i][0]));
conn.emplace(pubkey[i], random_localhost());
}
for (int i = 0; i < pubkey.size(); i++) {
omq.push_back(std::make_unique<OxenMQ>(
pubkey[i], privkey[i], true,
[conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; },
get_logger("S" + std::to_string(i) + "» "),
LogLevel::trace
));
auto& server = *omq.back();
server.listen_curve(conn[pubkey[i]]);
server.set_active_sns({pubkey.begin(), pubkey.end()});
}
std::string f;
omq[0]->add_category("a", Access{AuthLevel::none, true})
.add_command("a", [&](Message& m) {
m.oxenmq.send(m.conn, "b.b", "abc");
//m.send_back("b.b", "abc");
})
.add_command("z", [&](Message& m) {
auto lock = catch_lock();
f = m.data[0];
});
omq[1]->add_category("b", Access{AuthLevel::none, true})
.add_command("b", [&](Message& m) {
{
auto lock = catch_lock();
UNSCOPED_INFO("b.b from conn " << m.conn);
}
m.send_back("a.z", m.data[0]);
});
for (auto& server : omq)
server->start();
auto c = omq[1]->connect_sn(pubkey[0]);
omq[1]->send(c, "a.a");
std::this_thread::sleep_for(50ms);
auto lock = catch_lock();
REQUIRE(f == "abc");
}