diff --git a/CMakeLists.txt b/CMakeLists.txt index fee9fcb..affe3f5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7) set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)") project(liboxenmq - VERSION 1.2.6 + VERSION 1.2.7 LANGUAGES CXX C) include(GNUInstallDirs) diff --git a/oxenmq/oxenmq.cpp b/oxenmq/oxenmq.cpp index c30095b..ef802d9 100644 --- a/oxenmq/oxenmq.cpp +++ b/oxenmq/oxenmq.cpp @@ -262,7 +262,9 @@ void OxenMQ::start() { } void OxenMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection, std::function on_bind) { - if (!allow_connection) allow_connection = [](auto, auto, auto) { return AuthLevel::none; }; + if (std::string_view{bind_addr}.substr(0, 9) == "inproc://") + throw std::logic_error{"inproc:// cannot be used with listen_curve"}; + if (!allow_connection) allow_connection = [](auto&&...) { return AuthLevel::none; }; bind_data d{std::move(bind_addr), true, std::move(allow_connection), std::move(on_bind)}; if (proxy_thread.joinable()) detail::send_control(get_control_socket(), "BIND", bt_serialize(detail::serialize_object(std::move(d)))); @@ -271,7 +273,9 @@ void OxenMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection, std } void OxenMQ::listen_plain(std::string bind_addr, AllowFunc allow_connection, std::function on_bind) { - if (!allow_connection) allow_connection = [](auto, auto, auto) { return AuthLevel::none; }; + if (std::string_view{bind_addr}.substr(0, 9) == "inproc://") + throw std::logic_error{"inproc:// cannot be used with listen_plain"}; + if (!allow_connection) allow_connection = [](auto&&...) { return AuthLevel::none; }; bind_data d{std::move(bind_addr), false, std::move(allow_connection), std::move(on_bind)}; if (proxy_thread.joinable()) detail::send_control(get_control_socket(), "BIND", bt_serialize(detail::serialize_object(std::move(d)))); diff --git a/oxenmq/oxenmq.h b/oxenmq/oxenmq.h index 6b49b35..a0d484a 100644 --- a/oxenmq/oxenmq.h +++ b/oxenmq/oxenmq.h @@ -82,6 +82,9 @@ inline constexpr auto DEFAULT_CONNECT_SN_KEEP_ALIVE = 5min; // The default timeout for connect_remote() inline constexpr auto REMOTE_CONNECT_TIMEOUT = 10s; +// Default timeout for connect_inproc() +inline constexpr auto INPROC_CONNECT_TIMEOUT = 50ms; + // The amount of time we wait for a reply to a REQUEST before calling the callback with // `false` to signal a timeout. inline constexpr auto DEFAULT_REQUEST_TIMEOUT = 15s; @@ -410,6 +413,9 @@ private: /// The connections to/from remotes we currently have open, both listening and outgoing. std::map connections; + /// The connection ID of the built-in inproc listener for making requests to self + int64_t inproc_listener_connid; + /// If set then it indicates a change in `connections` which means we need to rebuild pollitems /// and stop using existing connections iterators. bool connections_updated = true; @@ -1000,7 +1006,7 @@ public: * connections on this address to determine the incoming remote's access and authentication * level. Note that `allow_connection` here will be called with an empty pubkey. * - * @param bind address - can be any string zmq supports; typically a tcp IP/port combination + * @param bind address - can be any string zmq supports, for example a tcp IP/port combination * such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678". * * @param allow_connection function to call to determine whether to allow the connection and, if @@ -1095,6 +1101,16 @@ public: AuthLevel auth_level = AuthLevel::none, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT); + /// Connects to the built-in in-process listening socket of this OxenMQ server for local + /// communication. Note that auth_level defaults to admin (unlike connect_remote), and the + /// default timeout is much shorter. + /// + /// Also note that incoming inproc requests are unauthenticated: that is, they will always have + /// admin-level access. + template + ConnectionID connect_inproc(ConnectSuccess on_connect, ConnectFailure on_failure, + const Option&... options); + /** * Disconnects an established outgoing connection established with `connect_remote()` (or, less * commonly, `connect_sn()`). @@ -1678,6 +1694,27 @@ ConnectionID OxenMQ::connect_sn(std::string_view pubkey, const Option&... option return pubkey; } +template +ConnectionID OxenMQ::connect_inproc(ConnectSuccess on_connect, ConnectFailure on_failure, + const Option&... options) { + bt_dict opts{ + {"timeout", INPROC_CONNECT_TIMEOUT.count()}, + {"auth_level", static_cast>(AuthLevel::admin)} + }; + + (detail::apply_connect_option(*this, true, opts, options), ...); + + auto id = next_conn_id++; + opts["conn_id"] = id; + opts["connect"] = detail::serialize_object(std::move(on_connect)); + opts["failure"] = detail::serialize_object(std::move(on_failure)); + opts["remote"] = "inproc://sn-self"; + + detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize(opts)); + + return id; +} + template void OxenMQ::send(ConnectionID to, std::string_view cmd, const T&... opts) { detail::send_control(get_control_socket(), "SEND", diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp index db564e1..1bae0fd 100644 --- a/oxenmq/proxy.cpp +++ b/oxenmq/proxy.cpp @@ -411,6 +411,13 @@ void OxenMQ::proxy_loop() { saved_umask = umask(STARTUP_UMASK); #endif + { + zmq::socket_t inproc_listener{context, zmq::socket_type::router}; + inproc_listener.bind(SN_ADDR_SELF); + inproc_listener_connid = next_conn_id++; + connections.emplace_hint(connections.end(), inproc_listener_connid, std::move(inproc_listener)); + } + for (size_t i = 0; i < bind.size(); i++) { if (!proxy_bind(bind[i], i)) { LMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address); diff --git a/oxenmq/worker.cpp b/oxenmq/worker.cpp index ae5046f..8d65197 100644 --- a/oxenmq/worker.cpp +++ b/oxenmq/worker.cpp @@ -293,6 +293,11 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vectorsecond; + } else if (conn_id == inproc_listener_connid) { + tmp_peer.auth_level = AuthLevel::admin; + tmp_peer.pubkey = pubkey; + tmp_peer.service_node = active_service_nodes.count(pubkey); + peer = &tmp_peer; } else { std::tie(tmp_peer.pubkey, tmp_peer.auth_level) = detail::extract_metadata(parts.back()); tmp_peer.service_node = tmp_peer.pubkey.size() == 32 && active_service_nodes.count(tmp_peer.pubkey); diff --git a/tests/test_connect.cpp b/tests/test_connect.cpp index b6cf109..84e20e8 100644 --- a/tests/test_connect.cpp +++ b/tests/test_connect.cpp @@ -504,3 +504,107 @@ TEST_CASE("SN backchatter", "[connect][sn]") { auto lock = catch_lock(); REQUIRE(f == "abc"); } + +TEST_CASE("inproc connections", "[connect][inproc]") { + std::string inproc_name = "foo"; + OxenMQ omq{get_logger("OMQ» "), LogLevel::trace}; + + omq.add_category("public", Access{AuthLevel::none}); + omq.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); + + omq.start(); + + std::atomic got{0}; + bool success = false; + auto c_inproc = omq.connect_inproc( + [&](auto conn) { success = true; got++; }, + [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("inproc connection failed: " << reason); got++; } + ); + + wait_for([&got] { return got.load() > 0; }); + { + auto lock = catch_lock(); + REQUIRE( success ); + REQUIRE( got == 1 ); + } + + got = 0; + success = false; + omq.request(c_inproc, "public.hello", [&](auto success_, auto parts_) { + success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++; + }); + reply_sleep(); + { + auto lock = catch_lock(); + REQUIRE( got == 1 ); + REQUIRE( success ); + } +} + +TEST_CASE("no explicit inproc listening", "[connect][inproc]") { + OxenMQ omq{get_logger("OMQ» "), LogLevel::trace}; + REQUIRE_THROWS_AS(omq.listen_plain("inproc://foo"), std::logic_error); + REQUIRE_THROWS_AS(omq.listen_curve("inproc://foo"), std::logic_error); +} + +TEST_CASE("inproc connection permissions", "[connect][inproc]") { + std::string listen = random_localhost(); + OxenMQ omq{get_logger("OMQ» "), LogLevel::trace}; + + omq.add_category("public", Access{AuthLevel::none}); + omq.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); + omq.add_category("private", Access{AuthLevel::admin}); + omq.add_request_command("private", "handshake", [&](Message& m) { m.send_reply("yo dude"); }); + + omq.listen_plain(listen); + + omq.start(); + + std::atomic got{0}; + bool success = false; + auto c_inproc = omq.connect_inproc( + [&](auto conn) { success = true; got++; }, + [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("inproc connection failed: " << reason); got++; } + ); + + bool pub_success = false; + auto c_pub = omq.connect_remote(address{listen}, + [&](auto conn) { pub_success = true; got++; }, + [&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("tcp connection failed: " << reason); got++; } + ); + + wait_for([&got] { return got.load() == 2; }); + { + auto lock = catch_lock(); + REQUIRE( got == 2 ); + REQUIRE( success ); + REQUIRE( pub_success ); + } + + got = 0; + success = false; + pub_success = false; + bool success_private = false; + bool pub_success_private = false; + omq.request(c_inproc, "public.hello", [&](auto success_, auto parts_) { + success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++; + }); + omq.request(c_pub, "public.hello", [&](auto success_, auto parts_) { + pub_success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++; + }); + omq.request(c_inproc, "private.handshake", [&](auto success_, auto parts_) { + success_private = success_ && parts_.size() == 1 && parts_.front() == "yo dude"; got++; + }); + omq.request(c_pub, "private.handshake", [&](auto success_, auto parts_) { + pub_success_private = success_; got++; + }); + wait_for([&got] { return got.load() == 4; }); + { + auto lock = catch_lock(); + REQUIRE( got == 4 ); + REQUIRE( success ); + REQUIRE( pub_success ); + REQUIRE( success_private ); + REQUIRE_FALSE( pub_success_private ); + } +}