#include "common.h" extern "C" { #include } TEST_CASE("connections with curve authentication", "[curve][connect]") { std::string listen = random_localhost(); OxenMQ server{ "", "", // generate ephemeral keys false, // not a service node [](auto) { return ""; }, get_logger("S» "), LogLevel::trace }; server.listen_curve(listen); server.add_category("public", Access{AuthLevel::none}); server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); server.start(); OxenMQ client{get_logger("C» "), LogLevel::trace}; client.start(); auto pubkey = server.get_pubkey(); std::atomic got{false}; bool success = false; auto server_conn = client.connect_remote(address{listen, pubkey}, [&](auto conn) { success = true; got = true; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; }); wait_for_conn(got); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE( success ); } success = false; std::vector parts; client.request(server_conn, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( success ); } } TEST_CASE("self-connection SN optimization", "[connect][self]") { std::string pubkey, privkey; pubkey.resize(crypto_box_PUBLICKEYBYTES); privkey.resize(crypto_box_SECRETKEYBYTES); REQUIRE(sodium_init() != -1); auto listen_addr = random_localhost(); crypto_box_keypair(reinterpret_cast(&pubkey[0]), reinterpret_cast(&privkey[0])); OxenMQ sn{ pubkey, privkey, true, [&](auto pk) { if (pk == pubkey) return listen_addr; else return ""s; }, get_logger("S» "), LogLevel::trace }; sn.listen_curve(listen_addr, [&](auto ip, auto pk, auto sn) { auto lock = catch_lock(); REQUIRE(ip == "127.0.0.1"); REQUIRE(sn == (pk == pubkey)); return AuthLevel::none; }); sn.add_category("a", Access{AuthLevel::none}); std::atomic invoked{false}; sn.add_command("a", "b", [&](const Message& m) { invoked = true; auto lock = catch_lock(); REQUIRE(m.conn.sn()); REQUIRE(m.conn.pubkey() == pubkey); REQUIRE(!m.data.empty()); REQUIRE(m.data[0] == "my data"); }); sn.set_active_sns({{pubkey}}); sn.start(); sn.send(pubkey, "a.b", "my data"); wait_for_conn(invoked); { auto lock = catch_lock(); REQUIRE(invoked); } } TEST_CASE("plain-text connections", "[plaintext][connect]") { std::string listen = random_localhost(); OxenMQ server{get_logger("S» "), LogLevel::trace}; server.add_category("public", Access{AuthLevel::none}); server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); server.listen_plain(listen); server.start(); OxenMQ client{get_logger("C» "), LogLevel::trace}; client.start(); std::atomic got{false}; bool success = false; auto c = client.connect_remote(address{listen}, [&](auto conn) { success = true; got = true; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; } ); wait_for_conn(got); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE( success ); } success = false; std::vector parts; client.request(c, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( success ); } } TEST_CASE("post-start listening", "[connect][listen]") { OxenMQ server{get_logger("S» "), LogLevel::trace}; server.add_category("x", AuthLevel::none) .add_request_command("y", [&](Message& m) { m.send_reply("hi", m.data[0]); }); server.start(); std::atomic listens = 0; auto listen_curve = random_localhost(); server.listen_curve(listen_curve, nullptr, [&](bool success) { if (success) listens++; }); auto listen_plain = random_localhost(); server.listen_plain(listen_plain, nullptr, [&](bool success) { if (success) listens += 10; }); wait_for([&] { return listens.load() >= 11; }); { auto lock = catch_lock(); REQUIRE( listens == 11 ); } // This should fail since we're already listening on it: server.listen_curve(listen_plain, nullptr, [&](bool success) { if (!success) listens++; }); wait_for([&] { return listens.load() >= 12; }); { auto lock = catch_lock(); REQUIRE( listens == 12 ); } OxenMQ client{get_logger("C1» "), LogLevel::trace}; client.start(); std::atomic conns = 0; auto c1 = client.connect_remote(address{listen_curve, server.get_pubkey()}, [&](auto) { conns++; }, [&](auto, auto why) { auto lock = catch_lock(); UNSCOPED_INFO("connection failed: " << why); }); auto c2 = client.connect_remote(address{listen_plain}, [&](auto) { conns += 10; }, [&](auto, auto why) { auto lock = catch_lock(); UNSCOPED_INFO("connection failed: " << why); }); wait_for([&] { return conns.load() >= 11; }); { auto lock = catch_lock(); REQUIRE( conns == 11 ); } std::atomic replies = 0; std::string reply1, reply2; client.request(c1, "x.y", [&](auto success, auto parts) { replies++; for (auto& p : parts) reply1 += p; }, " world"); client.request(c2, "x.y", [&](auto success, auto parts) { replies += 10; for (auto& p : parts) reply2 += p; }, " cat"); wait_for([&] { return replies.load() >= 11; }); { auto lock = catch_lock(); REQUIRE( replies == 11 ); REQUIRE( reply1 == "hi world" ); REQUIRE( reply2 == "hi cat" ); } } TEST_CASE("unique connection IDs", "[connect][id]") { std::string listen = random_localhost(); OxenMQ server{get_logger("S» "), LogLevel::trace}; ConnectionID first, second; server.add_category("x", Access{AuthLevel::none}) .add_request_command("x", [&](Message& m) { first = m.conn; m.send_reply("hi"); }) .add_request_command("y", [&](Message& m) { second = m.conn; m.send_reply("hi"); }) ; server.listen_plain(listen); server.start(); OxenMQ client1{get_logger("C1» "), LogLevel::trace}; OxenMQ client2{get_logger("C2» "), LogLevel::trace}; client1.start(); client2.start(); std::atomic good1{false}, good2{false}; auto r1 = client1.connect_remote(address{listen}, [&](auto conn) { good1 = true; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); } ); auto r2 = client2.connect_remote(address{listen}, [&](auto conn) { good2 = true; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); } ); wait_for_conn(good1); wait_for_conn(good2); { auto lock = catch_lock(); REQUIRE( good1 ); REQUIRE( good2 ); REQUIRE( first == second ); REQUIRE_FALSE( first ); REQUIRE_FALSE( second ); } good1 = false; good2 = false; client1.request(r1, "x.x", [&](auto success_, auto parts_) { good1 = true; }); client2.request(r2, "x.y", [&](auto success_, auto parts_) { good2 = true; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( good1 ); REQUIRE( good2 ); REQUIRE_FALSE( first == second ); REQUIRE_FALSE( std::hash{}(first) == std::hash{}(second) ); } } TEST_CASE("SN disconnections", "[connect][disconnect]") { std::vector> omq; std::vector pubkey, privkey; std::unordered_map conn; REQUIRE(sodium_init() != -1); for (int i = 0; i < 3; i++) { pubkey.emplace_back(); privkey.emplace_back(); pubkey[i].resize(crypto_box_PUBLICKEYBYTES); privkey[i].resize(crypto_box_SECRETKEYBYTES); crypto_box_keypair(reinterpret_cast(&pubkey[i][0]), reinterpret_cast(&privkey[i][0])); conn.emplace(pubkey[i], random_localhost()); } std::atomic his{0}; for (int i = 0; i < pubkey.size(); i++) { omq.push_back(std::make_unique( pubkey[i], privkey[i], true, [conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; }, get_logger("S" + std::to_string(i) + "» "), LogLevel::trace )); auto& server = *omq.back(); server.listen_curve(conn[pubkey[i]]); server.add_category("sn", Access{AuthLevel::none, true}) .add_command("hi", [&](Message& m) { his++; }); server.set_active_sns({pubkey.begin(), pubkey.end()}); server.start(); } omq[0]->send(pubkey[1], "sn.hi"); omq[0]->send(pubkey[2], "sn.hi"); omq[2]->send(pubkey[0], "sn.hi"); omq[2]->send(pubkey[1], "sn.hi"); omq[1]->send(pubkey[0], "BYE"); omq[0]->send(pubkey[2], "sn.hi"); std::this_thread::sleep_for(50ms * TIME_DILATION); auto lock = catch_lock(); REQUIRE(his == 5); } TEST_CASE("SN auth checks", "[sandwich][auth]") { // When a remote connects, we check its authentication level; if at the time of connection it // isn't recognized as a SN but tries to invoke a SN command it'll be told to disconnect; if it // tries to send again it should reconnect and reauthenticate. This test is meant to test this // pattern where the reconnection/reauthentication now authenticates it as a SN. std::string listen = random_localhost(); std::string pubkey, privkey; pubkey.resize(crypto_box_PUBLICKEYBYTES); privkey.resize(crypto_box_SECRETKEYBYTES); REQUIRE(sodium_init() != -1); crypto_box_keypair(reinterpret_cast(&pubkey[0]), reinterpret_cast(&privkey[0])); OxenMQ server{ pubkey, privkey, true, // service node [](auto) { return ""; }, get_logger("A» "), LogLevel::trace }; std::atomic incoming_is_sn{false}; server.listen_curve(listen); server.add_category("public", Access{AuthLevel::none}) .add_request_command("hello", [&](Message& m) { m.send_reply("hi"); }) .add_request_command("sudo", [&](Message& m) { server.update_active_sns({{m.conn.pubkey()}}, {}); m.send_reply("making sandwiches"); }) .add_request_command("nosudo", [&](Message& m) { // Send the reply *first* because if we do it the other way we'll have just removed // ourselves from the list of SNs and thus would try to open an outbound connection // to deliver it since it's still queued as a message to a SN. m.send_reply("make them yourself"); server.update_active_sns({}, {{m.conn.pubkey()}}); }); server.add_category("sandwich", Access{AuthLevel::none, true}) .add_request_command("make", [&](Message& m) { m.send_reply("okay"); }); server.start(); OxenMQ client{ "", "", false, [&](auto remote_pk) { if (remote_pk == pubkey) return listen; return ""s; }, get_logger("B» "), LogLevel::trace}; client.start(); std::atomic got{false}; bool success; client.request(pubkey, "public.hello", [&](auto success_, auto) { success = success_; got = true; }); wait_for_conn(got); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE( success ); } got = false; using dvec = std::vector; dvec data; client.request(pubkey, "sandwich.make", [&](auto success_, auto data_) { success = success_; data = std::move(data_); got = true; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE_FALSE( success ); REQUIRE( data == dvec{{"FORBIDDEN_SN"}} ); } // Somebody set up us the bomb. Main sudo turn on. got = false; client.request(pubkey, "public.sudo", [&](auto success_, auto data_) { success = success_; data = data_; got = true; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE( success ); REQUIRE( data == dvec{{"making sandwiches"}} ); } got = false; client.request(pubkey, "sandwich.make", [&](auto success_, auto data_) { success = success_; data = std::move(data_); got = true; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE( success ); REQUIRE( data == dvec{{"okay"}} ); } // Take off every 'SUDO', You [not] know what you doing got = false; client.request(pubkey, "public.nosudo", [&](auto success_, auto data_) { success = success_; data = data_; got = true; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE( success ); REQUIRE( data == dvec{{"make them yourself"}} ); } got = false; client.request(pubkey, "sandwich.make", [&](auto success_, auto data_) { success = success_; data = std::move(data_); got = true; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( got ); REQUIRE_FALSE( success ); REQUIRE( data == dvec{{"FORBIDDEN_SN"}} ); } } TEST_CASE("SN single worker test", "[connect][worker]") { // Tests a failure case that could trigger when all workers are allocated (here we make that // simpler by just having one worker). std::string listen = random_localhost(); OxenMQ server{ "", "", false, // service node [](auto) { return ""; }, get_logger("S» "), LogLevel::trace }; server.set_general_threads(1); server.set_batch_threads(0); server.set_reply_threads(0); server.listen_plain(listen); server.add_category("c", Access{AuthLevel::none}) .add_request_command("x", [&](Message& m) { m.send_reply(); }) ; server.start(); OxenMQ client{get_logger("B» "), LogLevel::trace}; client.start(); auto conn = client.connect_remote(address{listen}, [](auto) {}, [](auto, auto) {}); std::atomic got{0}; std::atomic success{0}; client.request(conn, "c.x", [&](auto success_, auto) { if (success_) ++success; ++got; }); wait_for([&] { return got.load() >= 1; }); { auto lock = catch_lock(); REQUIRE( success == 1 ); } client.request(conn, "c.x", [&](auto success_, auto) { if (success_) ++success; ++got; }); wait_for([&] { return got.load() >= 2; }); { auto lock = catch_lock(); REQUIRE( success == 2 ); } } TEST_CASE("SN backchatter", "[connect][sn]") { // When we have a SN connection A -> B and then B sends a message to A on that existing // connection, A should see it as coming from B. std::vector> omq; std::vector pubkey, privkey; std::unordered_map conn; REQUIRE(sodium_init() != -1); for (int i = 0; i < 2; i++) { pubkey.emplace_back(); privkey.emplace_back(); pubkey[i].resize(crypto_box_PUBLICKEYBYTES); privkey[i].resize(crypto_box_SECRETKEYBYTES); crypto_box_keypair(reinterpret_cast(&pubkey[i][0]), reinterpret_cast(&privkey[i][0])); conn.emplace(pubkey[i], random_localhost()); } for (int i = 0; i < pubkey.size(); i++) { omq.push_back(std::make_unique( pubkey[i], privkey[i], true, [conn](auto pk) { auto it = conn.find((std::string) pk); if (it != conn.end()) return it->second; return ""s; }, get_logger("S" + std::to_string(i) + "» "), LogLevel::trace )); auto& server = *omq.back(); server.listen_curve(conn[pubkey[i]]); server.set_active_sns({pubkey.begin(), pubkey.end()}); } std::string f; omq[0]->add_category("a", Access{AuthLevel::none, true}) .add_command("a", [&](Message& m) { m.oxenmq.send(m.conn, "b.b", "abc"); //m.send_back("b.b", "abc"); }) .add_command("z", [&](Message& m) { auto lock = catch_lock(); f = m.data[0]; }); omq[1]->add_category("b", Access{AuthLevel::none, true}) .add_command("b", [&](Message& m) { { auto lock = catch_lock(); UNSCOPED_INFO("b.b from conn " << m.conn); } m.send_back("a.z", m.data[0]); }); for (auto& server : omq) server->start(); auto c = omq[1]->connect_sn(pubkey[0]); omq[1]->send(c, "a.a"); std::this_thread::sleep_for(50ms); auto lock = catch_lock(); REQUIRE(f == "abc"); } TEST_CASE("inproc connections", "[connect][inproc]") { std::string inproc_name = "foo"; OxenMQ omq{get_logger("OMQ» "), LogLevel::trace}; omq.add_category("public", Access{AuthLevel::none}); omq.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); omq.start(); std::atomic got{0}; bool success = false; auto c_inproc = omq.connect_inproc( [&](auto conn) { success = true; got++; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("inproc connection failed: " << reason); got++; } ); wait_for([&got] { return got.load() > 0; }); { auto lock = catch_lock(); REQUIRE( success ); REQUIRE( got == 1 ); } got = 0; success = false; omq.request(c_inproc, "public.hello", [&](auto success_, auto parts_) { success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++; }); reply_sleep(); { auto lock = catch_lock(); REQUIRE( got == 1 ); REQUIRE( success ); } } TEST_CASE("no explicit inproc listening", "[connect][inproc]") { OxenMQ omq{get_logger("OMQ» "), LogLevel::trace}; REQUIRE_THROWS_AS(omq.listen_plain("inproc://foo"), std::logic_error); REQUIRE_THROWS_AS(omq.listen_curve("inproc://foo"), std::logic_error); } TEST_CASE("inproc connection permissions", "[connect][inproc]") { std::string listen = random_localhost(); OxenMQ omq{get_logger("OMQ» "), LogLevel::trace}; omq.add_category("public", Access{AuthLevel::none}); omq.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); omq.add_category("private", Access{AuthLevel::admin}); omq.add_request_command("private", "handshake", [&](Message& m) { m.send_reply("yo dude"); }); omq.listen_plain(listen); omq.start(); std::atomic got{0}; bool success = false; auto c_inproc = omq.connect_inproc( [&](auto conn) { success = true; got++; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("inproc connection failed: " << reason); got++; } ); bool pub_success = false; auto c_pub = omq.connect_remote(address{listen}, [&](auto conn) { pub_success = true; got++; }, [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("tcp connection failed: " << reason); got++; } ); wait_for([&got] { return got.load() == 2; }); { auto lock = catch_lock(); REQUIRE( got == 2 ); REQUIRE( success ); REQUIRE( pub_success ); } got = 0; success = false; pub_success = false; bool success_private = false; bool pub_success_private = false; omq.request(c_inproc, "public.hello", [&](auto success_, auto parts_) { success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++; }); omq.request(c_pub, "public.hello", [&](auto success_, auto parts_) { pub_success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++; }); omq.request(c_inproc, "private.handshake", [&](auto success_, auto parts_) { success_private = success_ && parts_.size() == 1 && parts_.front() == "yo dude"; got++; }); omq.request(c_pub, "private.handshake", [&](auto success_, auto parts_) { pub_success_private = success_; got++; }); wait_for([&got] { return got.load() == 4; }); { auto lock = catch_lock(); REQUIRE( got == 4 ); REQUIRE( success ); REQUIRE( pub_success ); REQUIRE( success_private ); REQUIRE_FALSE( pub_success_private ); } }