Browse Source

Add support for inproc: requests

inproc support is special in zmq: in particular it completely bypasses
the auth layer, which causes problems in OxenMQ because we assume that a
message will always have auth information (set during initial connection
handshake).

This adds an "always-on" inproc listener and adds a new `connect_inproc`
method for a caller to establish a connection to it.

It also throws exceptions if you try to `listen_plain` or `listen_curve`
on an inproc address, because that won't work for the reasons detailed
above.
pull/43/head
Jason Rhinelander 11 months ago
parent
commit
f553085558
  1. 2
      CMakeLists.txt
  2. 8
      oxenmq/oxenmq.cpp
  3. 39
      oxenmq/oxenmq.h
  4. 7
      oxenmq/proxy.cpp
  5. 5
      oxenmq/worker.cpp
  6. 104
      tests/test_connect.cpp

2
CMakeLists.txt

@ -17,7 +17,7 @@ cmake_minimum_required(VERSION 3.7)
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
project(liboxenmq
VERSION 1.2.6
VERSION 1.2.7
LANGUAGES CXX C)
include(GNUInstallDirs)

8
oxenmq/oxenmq.cpp

@ -262,7 +262,9 @@ void OxenMQ::start() {
}
void OxenMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection, std::function<void(bool)> on_bind) {
if (!allow_connection) allow_connection = [](auto, auto, auto) { return AuthLevel::none; };
if (std::string_view{bind_addr}.substr(0, 9) == "inproc://")
throw std::logic_error{"inproc:// cannot be used with listen_curve"};
if (!allow_connection) allow_connection = [](auto&&...) { return AuthLevel::none; };
bind_data d{std::move(bind_addr), true, std::move(allow_connection), std::move(on_bind)};
if (proxy_thread.joinable())
detail::send_control(get_control_socket(), "BIND", bt_serialize(detail::serialize_object(std::move(d))));
@ -271,7 +273,9 @@ void OxenMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection, std
}
void OxenMQ::listen_plain(std::string bind_addr, AllowFunc allow_connection, std::function<void(bool)> on_bind) {
if (!allow_connection) allow_connection = [](auto, auto, auto) { return AuthLevel::none; };
if (std::string_view{bind_addr}.substr(0, 9) == "inproc://")
throw std::logic_error{"inproc:// cannot be used with listen_plain"};
if (!allow_connection) allow_connection = [](auto&&...) { return AuthLevel::none; };
bind_data d{std::move(bind_addr), false, std::move(allow_connection), std::move(on_bind)};
if (proxy_thread.joinable())
detail::send_control(get_control_socket(), "BIND", bt_serialize(detail::serialize_object(std::move(d))));

39
oxenmq/oxenmq.h

@ -82,6 +82,9 @@ inline constexpr auto DEFAULT_CONNECT_SN_KEEP_ALIVE = 5min;
// The default timeout for connect_remote()
inline constexpr auto REMOTE_CONNECT_TIMEOUT = 10s;
// Default timeout for connect_inproc()
inline constexpr auto INPROC_CONNECT_TIMEOUT = 50ms;
// The amount of time we wait for a reply to a REQUEST before calling the callback with
// `false` to signal a timeout.
inline constexpr auto DEFAULT_REQUEST_TIMEOUT = 15s;
@ -410,6 +413,9 @@ private:
/// The connections to/from remotes we currently have open, both listening and outgoing.
std::map<int64_t, zmq::socket_t> connections;
/// The connection ID of the built-in inproc listener for making requests to self
int64_t inproc_listener_connid;
/// If set then it indicates a change in `connections` which means we need to rebuild pollitems
/// and stop using existing connections iterators.
bool connections_updated = true;
@ -1000,7 +1006,7 @@ public:
* connections on this address to determine the incoming remote's access and authentication
* level. Note that `allow_connection` here will be called with an empty pubkey.
*
* @param bind address - can be any string zmq supports; typically a tcp IP/port combination
* @param bind address - can be any string zmq supports, for example a tcp IP/port combination
* such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678".
*
* @param allow_connection function to call to determine whether to allow the connection and, if
@ -1095,6 +1101,16 @@ public:
AuthLevel auth_level = AuthLevel::none,
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
/// Connects to the built-in in-process listening socket of this OxenMQ server for local
/// communication. Note that auth_level defaults to admin (unlike connect_remote), and the
/// default timeout is much shorter.
///
/// Also note that incoming inproc requests are unauthenticated: that is, they will always have
/// admin-level access.
template <typename... Option>
ConnectionID connect_inproc(ConnectSuccess on_connect, ConnectFailure on_failure,
const Option&... options);
/**
* Disconnects an established outgoing connection established with `connect_remote()` (or, less
* commonly, `connect_sn()`).
@ -1678,6 +1694,27 @@ ConnectionID OxenMQ::connect_sn(std::string_view pubkey, const Option&... option
return pubkey;
}
template <typename... Option>
ConnectionID OxenMQ::connect_inproc(ConnectSuccess on_connect, ConnectFailure on_failure,
const Option&... options) {
bt_dict opts{
{"timeout", INPROC_CONNECT_TIMEOUT.count()},
{"auth_level", static_cast<std::underlying_type_t<AuthLevel>>(AuthLevel::admin)}
};
(detail::apply_connect_option(*this, true, opts, options), ...);
auto id = next_conn_id++;
opts["conn_id"] = id;
opts["connect"] = detail::serialize_object(std::move(on_connect));
opts["failure"] = detail::serialize_object(std::move(on_failure));
opts["remote"] = "inproc://sn-self";
detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize(opts));
return id;
}
template <typename... T>
void OxenMQ::send(ConnectionID to, std::string_view cmd, const T&... opts) {
detail::send_control(get_control_socket(), "SEND",

7
oxenmq/proxy.cpp

@ -411,6 +411,13 @@ void OxenMQ::proxy_loop() {
saved_umask = umask(STARTUP_UMASK);
#endif
{
zmq::socket_t inproc_listener{context, zmq::socket_type::router};
inproc_listener.bind(SN_ADDR_SELF);
inproc_listener_connid = next_conn_id++;
connections.emplace_hint(connections.end(), inproc_listener_connid, std::move(inproc_listener));
}
for (size_t i = 0; i < bind.size(); i++) {
if (!proxy_bind(bind[i], i)) {
LMQ_LOG(warn, "OxenMQ failed to listen on ", bind[i].address);

5
oxenmq/worker.cpp

@ -293,6 +293,11 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
return;
}
peer = &it->second;
} else if (conn_id == inproc_listener_connid) {
tmp_peer.auth_level = AuthLevel::admin;
tmp_peer.pubkey = pubkey;
tmp_peer.service_node = active_service_nodes.count(pubkey);
peer = &tmp_peer;
} else {
std::tie(tmp_peer.pubkey, tmp_peer.auth_level) = detail::extract_metadata(parts.back());
tmp_peer.service_node = tmp_peer.pubkey.size() == 32 && active_service_nodes.count(tmp_peer.pubkey);

104
tests/test_connect.cpp

@ -504,3 +504,107 @@ TEST_CASE("SN backchatter", "[connect][sn]") {
auto lock = catch_lock();
REQUIRE(f == "abc");
}
TEST_CASE("inproc connections", "[connect][inproc]") {
std::string inproc_name = "foo";
OxenMQ omq{get_logger("OMQ» "), LogLevel::trace};
omq.add_category("public", Access{AuthLevel::none});
omq.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); });
omq.start();
std::atomic<int> got{0};
bool success = false;
auto c_inproc = omq.connect_inproc(
[&](auto conn) { success = true; got++; },
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("inproc connection failed: " << reason); got++; }
);
wait_for([&got] { return got.load() > 0; });
{
auto lock = catch_lock();
REQUIRE( success );
REQUIRE( got == 1 );
}
got = 0;
success = false;
omq.request(c_inproc, "public.hello", [&](auto success_, auto parts_) {
success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++;
});
reply_sleep();
{
auto lock = catch_lock();
REQUIRE( got == 1 );
REQUIRE( success );
}
}
TEST_CASE("no explicit inproc listening", "[connect][inproc]") {
OxenMQ omq{get_logger("OMQ» "), LogLevel::trace};
REQUIRE_THROWS_AS(omq.listen_plain("inproc://foo"), std::logic_error);
REQUIRE_THROWS_AS(omq.listen_curve("inproc://foo"), std::logic_error);
}
TEST_CASE("inproc connection permissions", "[connect][inproc]") {
std::string listen = random_localhost();
OxenMQ omq{get_logger("OMQ» "), LogLevel::trace};
omq.add_category("public", Access{AuthLevel::none});
omq.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); });
omq.add_category("private", Access{AuthLevel::admin});
omq.add_request_command("private", "handshake", [&](Message& m) { m.send_reply("yo dude"); });
omq.listen_plain(listen);
omq.start();
std::atomic<int> got{0};
bool success = false;
auto c_inproc = omq.connect_inproc(
[&](auto conn) { success = true; got++; },
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("inproc connection failed: " << reason); got++; }
);
bool pub_success = false;
auto c_pub = omq.connect_remote(address{listen},
[&](auto conn) { pub_success = true; got++; },
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("tcp connection failed: " << reason); got++; }
);
wait_for([&got] { return got.load() == 2; });
{
auto lock = catch_lock();
REQUIRE( got == 2 );
REQUIRE( success );
REQUIRE( pub_success );
}
got = 0;
success = false;
pub_success = false;
bool success_private = false;
bool pub_success_private = false;
omq.request(c_inproc, "public.hello", [&](auto success_, auto parts_) {
success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++;
});
omq.request(c_pub, "public.hello", [&](auto success_, auto parts_) {
pub_success = success_ && parts_.size() == 1 && parts_.front() == "hi"; got++;
});
omq.request(c_inproc, "private.handshake", [&](auto success_, auto parts_) {
success_private = success_ && parts_.size() == 1 && parts_.front() == "yo dude"; got++;
});
omq.request(c_pub, "private.handshake", [&](auto success_, auto parts_) {
pub_success_private = success_; got++;
});
wait_for([&got] { return got.load() == 4; });
{
auto lock = catch_lock();
REQUIRE( got == 4 );
REQUIRE( success );
REQUIRE( pub_success );
REQUIRE( success_private );
REQUIRE_FALSE( pub_success_private );
}
}

Loading…
Cancel
Save