From 57f0ca74da7643b978c69203fd2a8a2f18a8bc5c Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 28 Feb 2020 00:16:43 -0400 Subject: [PATCH] Added support for general (non-SN) communications The existing code was largely set up for SN-to-SN or client-to-SN communications, where messages can always get to the right place because we can always send by pubkey. This doesn't work when we want general communications with a random remote address. This commit overhauls the way loki-mq handles communication in a few important ways: - Listening instances no longer pass bind addresses into the constructor; instead they call `listen_curve()` or `listen_plain()` before invoking `start()`. - `listen_curve()` is equivalent to the existing bind support: it listens on a socket and accepts encrypted handshaked connections from anyone who already knows the server's public key. - `listen_plain()` is all new: it sets up a plain text listening socket over which random clients can connect and talk. End-points aren't verified, and it isn't encrypted, but if you don't know who you are talking to then encryption isn't doing anything anyway. - Connecting to a remote now connections in CURVE encryption or NULL (plain-text) encryption based on whether you provide a remote_pubkey. For CURVE, the connection will fail if the pubkey does not match. - `ConnectionID` objects are now returned when connecting to a remote address; this object is then passed in to send/request/etc. to direct the message. For SN communication, ConnectionID's can be created implicitly from SN pubkey strings, so the existing interface of `lmq.send(pubkey, ...)` will still work in most cases. - A ConnectionID is now passed to the ConnectSuccess and ConnectFailure callbacks. This can be used to uniquely identify which connection succeeded or failed, and can determine whether the remote is a service node (`.sn()`) and/or the pubkey (`.pubkey()`). (Obviously the service node status is only available when the client can do service node lookups, and the pubkey() is only non-empty for encrypted connections). --- lokimq/lokimq.cpp | 952 ++++++++++++++++++++++++---------------- lokimq/lokimq.h | 494 ++++++++++++++------- tests/test_batch.cpp | 4 - tests/test_commands.cpp | 24 +- tests/test_connect.cpp | 61 ++- tests/test_requests.cpp | 74 +++- 6 files changed, 1028 insertions(+), 581 deletions(-) diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index ceb626b..1b5bad0 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -26,9 +26,6 @@ constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01"; # define LMQ_TRACE(...) #endif -// This is the domain used for listening service nodes. -constexpr const char AUTH_DOMAIN_SN[] = "loki.sn"; - namespace { @@ -93,6 +90,7 @@ void send_message_parts(zmq::socket_t &sock, Container &&c) { /// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then /// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted. void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {}) { + assert(!route.empty()); std::array msgs{{create_message(std::move(route))}}; if (!msg.empty()) msgs[1] = create_message(std::move(msg)); @@ -120,16 +118,11 @@ std::vector as_strings(const MessageContainer& msgs) { } // Returns a string view of the given message data. It's the caller's responsibility to keep the -// referenced message alive. +// referenced message alive. If you want a std::string instead just call `m.to_string()` string_view view(const zmq::message_t& m) { return {m.data(), m.size()}; } -// Like the above, but copies the data into a string. -std::string string(const zmq::message_t& m) { - return std::string{view(m)}; -} - // Builds a ZMTP metadata key-value pair. These will be available on every message from that peer. // Keys must start with X- and be <= 255 characters. std::string zmtp_metadata(string_view key, string_view value) { @@ -175,22 +168,39 @@ std::string to_string(AuthLevel a) { default: return "(unknown)"; } } +AuthLevel auth_from_string(string_view a) { + if (a == "none") return AuthLevel::none; + if (a == "basic") return AuthLevel::basic; + if (a == "admin") return AuthLevel::admin; + return AuthLevel::denied; +} -/// Extracts a pubkey and SN status from a zmq message properties. Throws on failure. -void extract_pubkey(zmq::message_t& msg, std::string& pubkey, bool& service_node) { - string_view pubkey_hex{msg.gets("User-Id")}; - if (pubkey_hex.size() != 64) - throw std::logic_error("bad user-id"); - assert(is_hex(pubkey_hex.begin(), pubkey_hex.end())); - pubkey.resize(32, 0); - from_hex(pubkey_hex.begin(), pubkey_hex.end(), pubkey.begin()); +/// Extracts a pubkey, SN status, and auth level from a zmq message received on a *listening* +/// socket. +std::tuple extract_metadata(zmq::message_t& msg) { + auto result = std::make_tuple(""s, false, AuthLevel::none); + try { + string_view pubkey_hex{msg.gets("User-Id")}; + if (pubkey_hex.size() != 64) + throw std::logic_error("bad user-id"); + assert(is_hex(pubkey_hex.begin(), pubkey_hex.end())); + auto& pubkey = std::get(result); + pubkey.resize(32, 0); + from_hex(pubkey_hex.begin(), pubkey_hex.end(), pubkey.begin()); + } catch (...) {} - service_node = false; try { string_view is_sn{msg.gets("X-SN")}; if (is_sn.size() == 1 && is_sn[0] == '1') - service_node = true; - } catch (...) { /* property not set, ignore */ } + std::get(result) = true; + } catch (...) {} + + try { + string_view auth_level{msg.gets("X-AuthLevel")}; + std::get(result) = auth_from_string(auth_level); + } catch (...) {} + + return result; } const char* peer_address(zmq::message_t& msg) { @@ -198,6 +208,14 @@ const char* peer_address(zmq::message_t& msg) { return "(unknown)"; } +void add_pollitem(std::vector& pollitems, zmq::socket_t& sock) { + pollitems.emplace_back(); + auto &p = pollitems.back(); + p.socket = static_cast(sock); + p.fd = 0; + p.events = ZMQ_POLLIN; +} + } // anonymous namespace @@ -219,13 +237,22 @@ void send_control(zmq::socket_t& sock, string_view cmd, std::string data) { } // namespace detail +std::ostream& operator<<(std::ostream& o, const ConnectionID& conn) { + if (!conn.pk.empty()) + return o << (conn.sn() ? "SN " : "non-SN authenticated remote ") << to_hex(conn.pk); + else + return o << "unauthenticated remote [" << conn.id << "]"; +} -void LokiMQ::add_pollitem(zmq::socket_t& sock) { - pollitems.emplace_back(); - auto &p = pollitems.back(); - p.socket = static_cast(sock); - p.fd = 0; - p.events = ZMQ_POLLIN; + +void LokiMQ::rebuild_pollitems() { + pollitems.clear(); + add_pollitem(pollitems, command); + add_pollitem(pollitems, workers_socket); + add_pollitem(pollitems, zap_auth); + + for (auto& s : connections) + add_pollitem(pollitems, s); } void LokiMQ::log_level(LogLevel level) { @@ -349,13 +376,11 @@ LokiMQ::LokiMQ( std::string pubkey_, std::string privkey_, bool service_node, - std::vector bind_, SNRemoteAddress lookup, - AllowFunc allow, Logger logger) : object_id{next_id++}, pubkey{std::move(pubkey_)}, privkey{std::move(privkey_)}, local_service_node{service_node}, - bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{std::move(logger)}, - poll_remote_offset{poll_internal_size + (bind.empty() ? 0 : 1)} { + sn_lookup{std::move(lookup)}, logger{std::move(logger)} +{ LMQ_TRACE("Constructing listening LokiMQ, id=", object_id, ", this=", this); @@ -381,18 +406,18 @@ LokiMQ::LokiMQ( if (verify_pubkey != pubkey) throw std::invalid_argument("Invalid pubkey/privkey values given to LokiMQ construction: pubkey verification failed"); } - - // If we're not binding to anything then we don't listen, i.e. we can only establish outbound - // connections. Don't allow this if we are in service_node mode because, if we aren't - // listening, we are useless as a service node. - if (bind.empty() && service_node) - throw std::invalid_argument{"Cannot create a service node listener with no address(es) to bind"}; } void LokiMQ::start() { if (proxy_thread.joinable()) throw std::logic_error("Cannot call start() multiple times!"); + // If we're not binding to anything then we don't listen, i.e. we can only establish outbound + // connections. Don't allow this if we are in service_node mode because, if we aren't + // listening, we are useless as a service node. + if (bind.empty() && local_service_node) + throw std::invalid_argument{"Cannot create a service node listener with no address(es) to bind"}; + LMQ_LOG(info, "Initializing LokiMQ ", bind.empty() ? "remote-only" : "listener", " with pubkey ", to_hex(pubkey)); // We bind `command` here so that the `get_control_socket()` below is always connecting to a @@ -424,7 +449,7 @@ void LokiMQ::worker_thread(unsigned int index) { LMQ_LOG(debug, "New worker thread ", worker_id, " started"); sock.connect(SN_ADDR_WORKERS); - Message message{*this}; + Message message{*this, 0}; std::vector parts; run_info& run = workers[index]; // This contains our first job, and will be updated later with subsequent jobs @@ -439,11 +464,11 @@ void LokiMQ::worker_thread(unsigned int index) { run.batch->job_completion(); } } else { - message.pubkey = run.pubkey; - message.service_node = run.service_node; + message.conn = run.conn; + message.route = run.conn_route; message.data.clear(); - LMQ_TRACE("Got incoming command from pubkey ", to_hex(message.pubkey), " (SN=", (int) message.service_node, ")"); + LMQ_TRACE("Got incoming command from ", message.conn, message.route.empty() ? "(outgoing)" : "(incoming)"); if (run.callback->second /*is_request*/) { message.reply_tag = {run.data_parts[0].data(), run.data_parts[0].size()}; @@ -457,43 +482,6 @@ void LokiMQ::worker_thread(unsigned int index) { LMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); run.callback->first(message); } - - /* - * FIXME: BYE should be handled by the proxy thread, not the worker. - */ - /* - if (msg.command == "BYE") { - LMQ_LOG(info, "peer asked us to disconnect"); - detail::send_control(get_control_socket(), "DISCONNECT", msg.pubkey); - continue; - } - */ - - /* FIXME: this lookup and auth check belongs in the proxy */ - /* - auto cmdit = commands.find(msg.command); - if (cmdit == commands.end()) { - LMQ_LOG(warn, worker_id, " received unknown command '", msg.command, "' from " << - (msg.sn ? "SN " : "non-SN ") << to_hex(msg.pubkey)); - continue; - } - - auto cmd_type = cmdit->second.second; - const bool command_accepted = ( - cmd_type == command_type::response ? msg.sn : - cmd_type == command_type::quorum ? msg.sn && is_service_node() : - cmd_type == command_type::public_ ? is_service_node() : - false); - if (!command_accepted) { - // If they aren't valid, tell them so that they can disconnect (and attempt to reconnect later with appropriate authentication) - LMQ_LOG(warn, worker_id, "/", object_id, " received disallowed ", cmd_type, " command ", msg.command << - " from " << (msg.sn ? "non-" : "") << "SN remote " << to_hex(msg.pubkey) << "; replying with a BYE"); - send(msg.pubkey, "BYE", send_option::incoming{}); - detail::send_control(get_control_socket(), "DISCONNECT", msg.pubkey); - continue; - } - - */ } catch (const bt_deserialize_invalid& e) { LMQ_LOG(warn, worker_id, " deserialization failed: ", e.what(), "; ignoring request"); @@ -543,7 +531,7 @@ void LokiMQ::worker_thread(unsigned int index) { void LokiMQ::proxy_quit() { LMQ_LOG(debug, "Received quit command, shutting down proxy thread"); - assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.thread.joinable(); })); + assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); })); command.setsockopt(ZMQ_LINGER, 0); command.close(); @@ -555,59 +543,51 @@ void LokiMQ::proxy_quit() { } workers_socket.close(); int linger = std::chrono::milliseconds{CLOSE_LINGER}.count(); - if (listener.connected()) { - listener.setsockopt(ZMQ_LINGER, linger); - listener.close(); - } - for (auto &r : remotes) - r.second.setsockopt(ZMQ_LINGER, linger); - remotes.clear(); - peers.clear(); + for (auto& s : connections) + s.setsockopt(ZMQ_LINGER, linger); + connections.clear(); + peers.clear(); LMQ_LOG(debug, "Proxy thread teardown complete"); } void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) { - // FIXME: not passing a remote_pubkey is a problem - if (!remote_pubkey.empty()) + if (!remote_pubkey.empty()) { socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); - socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); + socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + } socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); socket.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); socket.setsockopt(ZMQ_ROUTING_ID, pubkey.data(), pubkey.size()); } std::pair -LokiMQ::proxy_connect_sn(string_view remote_sv, string_view connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) { - std::string remote{remote_sv}; - auto &peer = peers[remote]; // We may auto-vivify here, but that's okay; it'll get cleaned up in idle_expiry if no connection gets established - - std::pair result = {nullptr, ""s}; - - bool outgoing = false; - if (peer.outgoing >= 0 && !incoming_only) { - result.first = &remotes[peer.outgoing].second; - outgoing = true; - } else if (!peer.incoming.empty() && listener.connected()) { - result.first = &listener; - result.second = peer.incoming; +LokiMQ::proxy_connect_sn(string_view remote, string_view connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) { + ConnectionID remote_cid{remote}; + auto its = peers.equal_range(remote_cid); + peer_info* peer = nullptr; + for (auto it = its.first; it != its.second; ++it) { + if (incoming_only && it->second.route.empty()) + continue; // outgoing connection but we were asked to only use incoming connections + peer = &it->second; + break; } - if (result.first) { + if (peer) { LMQ_TRACE("proxy asked to connect to ", to_hex(remote), "; reusing existing connection"); - if (outgoing) { - if (peer.idle_expiry < keep_alive) { + if (peer->route.empty() /* == outgoing*/) { + if (peer->idle_expiry < keep_alive) { LMQ_LOG(debug, "updating existing outgoing peer connection idle expiry time from ", - peer.idle_expiry.count(), "ms to ", keep_alive.count(), "ms"); - peer.idle_expiry = keep_alive; + peer->idle_expiry.count(), "ms to ", keep_alive.count(), "ms"); + peer->idle_expiry = keep_alive; } - peer.activity(); + peer->activity(); } - return result; + return {&connections[peer->conn_index], peer->route}; } else if (optional || incoming_only) { - LMQ_LOG(debug, "proxy asked for optional or incoming connection, but no appropriate connection exists so cancelling connection attempt"); - return result; + LMQ_LOG(debug, "proxy asked for optional or incoming connection, but no appropriate connection exists so aborting connection attempt"); + return {nullptr, ""s}; } // No connection so establish a new one @@ -621,43 +601,51 @@ LokiMQ::proxy_connect_sn(string_view remote_sv, string_view connect_hint, bool o } else { addr = std::string{connect_hint}; if (addr.empty()) - addr = peer_lookup(remote); + addr = sn_lookup(remote); else LMQ_LOG(debug, "using connection hint ", connect_hint); if (addr.empty()) { LMQ_LOG(error, "peer lookup failed for ", to_hex(remote)); - return result; + return {nullptr, ""s}; } } - LMQ_LOG(debug, to_hex(pubkey), " connecting to ", addr, " to reach ", to_hex(remote)); + LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote)); zmq::socket_t socket{context, zmq::socket_type::dealer}; setup_outgoing_socket(socket, remote); socket.connect(addr); - peer.idle_expiry = keep_alive; + peer_info p{}; + p.service_node = true; + p.pubkey = std::string{remote}; + p.conn_index = connections.size(); + p.idle_expiry = keep_alive; + p.activity(); + peers.emplace(std::move(remote_cid), std::move(p)); + connections.push_back(std::move(socket)); - add_pollitem(socket); - peer.outgoing = remotes.size(); - remotes.emplace_back(remote, std::move(socket)); - peer.service_node = true; - peer.activity(); - - result.first = &remotes.back().second; - return result; + return {&connections.back(), ""s}; } -std::pair LokiMQ::proxy_connect_sn(bt_dict &&data) { - auto remote_pubkey = data.at("pubkey").get(); - std::chrono::milliseconds keep_alive{get_int(data.at("keep-alive"))}; - std::string hint; - auto hint_it = data.find("hint"); - if (hint_it != data.end()) - hint = data.at("hint").get(); +std::pair LokiMQ::proxy_connect_sn(bt_dict_consumer data) { + string_view hint, remote_pk; + std::chrono::milliseconds keep_alive; + bool optional = false, incoming_only = false; - bool optional = data.count("optional"), incoming = data.count("incoming"); + // Alphabetical order + if (data.skip_until("hint")) + hint = data.consume_string(); + if (data.skip_until("incoming")) + incoming_only = data.consume_integer(); + if (data.skip_until("keep-alive")) + keep_alive = std::chrono::milliseconds{data.consume_integer()}; + if (data.skip_until("optional")) + optional = data.consume_integer(); + if (!data.skip_until("pubkey")) + throw std::runtime_error("Internal error: Invalid proxy_connect_sn command; pubkey missing"); + remote_pk = data.consume_string(); - return proxy_connect_sn(remote_pubkey, hint, optional, incoming, keep_alive); + return proxy_connect_sn(remote_pk, hint, optional, incoming_only, keep_alive); } void LokiMQ::proxy_send(bt_dict_consumer data) { @@ -667,8 +655,20 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { bool optional = false; bool incoming = false; bool request = false; + bool have_conn_id = false; + ConnectionID conn_id; + string_view conn_route; + std::string request_tag; std::unique_ptr request_cbptr; + if (data.skip_until("conn_id")) { + conn_id.id = data.consume_integer(); + if (conn_id.id == -1) + throw std::runtime_error("Invalid error: invalid conn_id value (-1)"); + have_conn_id = true; + } + if (data.skip_until("conn_route")) + conn_route = data.consume_string_view(); if (data.skip_until("hint")) hint = data.consume_string_view(); if (data.skip_until("incoming")) @@ -677,9 +677,14 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { keep_alive = std::chrono::milliseconds{data.consume_integer()}; if (data.skip_until("optional")) optional = data.consume_integer(); - if (!data.skip_until("pubkey")) - throw std::runtime_error("Internal error: Invalid proxy send command; pubkey missing"); - string_view remote_pubkey = data.consume_string_view(); + if (data.skip_until("pubkey")) { + if (have_conn_id) + throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and pubkey are exclusive"); + conn_id.pk = data.consume_string(); + conn_id.id = ConnectionID::SN_ID; + } else if (!have_conn_id) + throw std::runtime_error("Internal error: Invalid proxy send command; pubkey or conn_id missing"); + if (data.skip_until("request")) request = data.consume_integer(); if (request) { @@ -694,13 +699,36 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing"); bt_list_consumer send = data.consume_list_consumer(); - auto sock_route = proxy_connect_sn(remote_pubkey, hint, optional, incoming, keep_alive); - if (!sock_route.first) { - if (optional) - LMQ_LOG(debug, "Not sending: send is optional and no connection to ", to_hex(remote_pubkey), " is currently established"); - else - LMQ_LOG(error, "Unable to send to ", to_hex(remote_pubkey), ": no connection could be established"); - return; + zmq::socket_t *send_to; + std::string routing_prefix; + if (conn_id.sn()) { + auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, keep_alive); + if (!sock_route.first) { + if (optional) + LMQ_LOG(debug, "Not sending: send is optional and no connection to ", + to_hex(conn_id.pk), " is currently established"); + else + LMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no connection address found"); + return; + } + send_to = sock_route.first; + routing_prefix = std::move(sock_route.second); + } else if (!conn_route.empty()) { // incoming non-SN connection + auto it = incoming_conn_index.find(conn_id); + if (it == incoming_conn_index.end()) { + LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found"); + return; + } + send_to = &connections[it->second]; + routing_prefix = std::string{conn_route}; + } else { + auto pr = peers.equal_range(conn_id); + if (pr.first == peers.end()) { + LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid connection"); + return; + } + auto& peer = pr.first->second; + send_to = &connections[peer.conn_index]; } if (request) { @@ -710,44 +738,80 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { } try { - send_message_parts(*sock_route.first, build_send_parts(send, sock_route.second)); + send_message_parts(*send_to, build_send_parts(send, routing_prefix)); } catch (const zmq::error_t &e) { - if (e.num() == EHOSTUNREACH && sock_route.first == &listener && !sock_route.second.empty()) { - // We *tried* to route via the incoming connection but it is no longer valid. Drop it, - // establish a new connection, and try again. - auto &peer = peers[std::string{remote_pubkey}]; - peer.incoming.clear(); // Don't worry about cleaning the map entry if outgoing is also < 0: that will happen at the next idle cleanup - LMQ_LOG(debug, "Could not route back to SN ", to_hex(remote_pubkey), " via listening socket; trying via new outgoing connection"); - return proxy_send(std::move(data)); + if (e.num() == EHOSTUNREACH && !routing_prefix.empty() /*= incoming conn*/) { + LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details"); + // Our incoming connection no longer exists; remove it from `peers`. + auto pr = peers.equal_range(conn_id); + if (pr.first != peers.end()) { + if (!conn_id.sn()) { + peers.erase(pr.first); + } else { + bool removed; + for (auto it = pr.first; it != pr.second; ) { + auto& peer = it->second; + if (peer.route == routing_prefix) { + peers.erase(it); + removed = true; + break; + } + } + // The incoming connection to the SN is no longer good, but we can retry because + // we may have another active connection with the SN (or may want to open one). + if (removed) { + LMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets"); + return proxy_send(std::move(data)); + } + } + } } - LMQ_LOG(warn, "Unable to send message to remote SN ", to_hex(remote_pubkey), ": ", e.what()); + LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what()); } } void LokiMQ::proxy_reply(bt_dict_consumer data) { - // NB: bt_dict_consumer goes in alphabetical order - data.skip_until("route"); - string_view route = data.consume_string(); - assert(!route.empty()); + bool have_conn_id = false; + ConnectionID conn_id{0}; + if (data.skip_until("conn_id")) { + conn_id.id = data.consume_integer(); + if (conn_id.id == -1) + throw std::runtime_error("Invalid error: invalid conn_id value (-1)"); + have_conn_id = true; + } + if (data.skip_until("pubkey")) { + if (have_conn_id) + throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and pubkey are exclusive"); + conn_id.pk = data.consume_string(); + conn_id.id = ConnectionID::SN_ID; + } else if (!have_conn_id) + throw std::runtime_error("Internal error: Invalid proxy send command; pubkey or conn_id missing"); + if (!data.skip_until("send")) + throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing"); - if (!listener.connected()) { - // FIXME: this is wrong; we can reply to something even with no listener (e.g. if client - // says A, server replies B, client replies to that with C). - LMQ_LOG(error, "Internal error: proxy_reply called but that shouldn't be possible as we have no listener!"); + bt_list_consumer send = data.consume_list_consumer(); + + auto pr = peers.equal_range(conn_id); + if (pr.first == pr.second) { + LMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid"); return; } - if (!data.skip_until("send")) - throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing"); - bt_list_consumer send = data.consume_list_consumer(); - - try { - send_message_parts(listener, build_send_parts(send, route)); - } catch (const zmq::error_t &err) { - if (err.num() == EHOSTUNREACH) { - LMQ_LOG(info, "Unable to send reply to incoming non-SN request: remote is no longer connected"); - } else { - LMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what()); + // We try any connections until one works (for ordinary remotes there will be just one, but for + // SNs there might be one incoming and one outgoing). + for (auto it = pr.first; it != pr.second; ) { + try { + send_message_parts(connections[it->second.conn_index], build_send_parts(send, it->second.route)); + break; + } catch (const zmq::error_t &err) { + if (err.num() == EHOSTUNREACH) { + LMQ_LOG(info, "Unable to send reply to incoming non-SN request: remote is no longer connected"); + LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details"); + it = peers.erase(it); + } else { + LMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what()); + ++it; + } } } } @@ -810,11 +874,12 @@ void LokiMQ::proxy_control_message(std::vector& parts) { auto ptrval = bt_deserialize(view(parts[2])); return proxy_batch(reinterpret_cast(ptrval)); } else if (cmd == "CONNECT_SN") { - proxy_connect_sn(bt_deserialize(view(parts[2]))); + proxy_connect_sn(view(parts[2])); return; } else if (cmd == "CONNECT_REMOTE") { - proxy_connect_remote(view(parts[2])); - return; + return proxy_connect_remote(view(parts[2])); + } else if (cmd == "DISCONNECT") { + return proxy_disconnect(view(parts[2])); } else if (cmd == "TIMER") { return proxy_timer(view(parts[2])); } @@ -829,7 +894,7 @@ void LokiMQ::proxy_control_message(std::vector& parts) { // connections once all workers are done. max_workers = 0; for (const auto &route : idle_workers) - route_control(workers_socket, workers[route].routing_id, "QUIT"); + route_control(workers_socket, workers[route].worker_routing_id, "QUIT"); idle_workers.clear(); return; } @@ -838,64 +903,50 @@ void LokiMQ::proxy_control_message(std::vector& parts) { " (" + std::to_string(parts.size()) + ")"); } - -void LokiMQ::proxy_close_remote(int index, bool linger) { - remotes[index].second.setsockopt(ZMQ_LINGER, linger ? std::chrono::milliseconds{CLOSE_LINGER}.count() : 0); - pollitems.erase(pollitems.begin() + poll_remote_offset + index); - remotes.erase(remotes.begin() + index); - assert(remotes.size() == pollitems.size() + poll_remote_offset); - - for (auto& p : peers) - if (p.second.outgoing > index) - --p.second.outgoing; - - for (auto& pc : pending_connects) { - auto& i = std::get(pc); - if (i > index) +template +void update_connection_indices(Container& c, size_t index, AccessIndex get_index) { + for (auto it = c.begin(); it != c.end(); ) { + size_t& i = get_index(*it); + if (index == i) { + it = c.erase(it); + continue; + } + if (index > i) --i; + ++it; } } -auto LokiMQ::proxy_close_outgoing(decltype(peers)::iterator it) -> decltype(it) { - auto &peer = *it; - auto &info = peer.second; +void LokiMQ::proxy_close_connection(size_t index, std::chrono::milliseconds linger) { + connections[index].setsockopt(ZMQ_LINGER, linger > 0ms ? linger.count() : 0); + pollitems_stale = true; + connections.erase(connections.begin() + index); - if (info.outgoing >= 0) { - proxy_close_remote(info.outgoing); - info.outgoing = -1; - } - - if (info.incoming.empty()) - // Neither incoming nor outgoing connections left, so erase the peer info - return peers.erase(it); - - return std::next(it); -} - -void LokiMQ::proxy_disconnect(const std::string &remote) { - auto it = peers.find(remote); - if (it == peers.end()) - return; - if (it->second.outgoing >= 0) - LMQ_LOG(debug, "Closing outgoing connection to ", to_hex(it->first)); - proxy_close_outgoing(it); + update_connection_indices(peers, index, + [](auto& p) -> size_t& { return p.second.conn_index; }); + update_connection_indices(pending_connects, index, + [](auto& pc) -> size_t& { return std::get(pc); }); + update_connection_indices(bind, index, + [](auto& b) -> size_t& { return b.second.index; }); + update_connection_indices(incoming_conn_index, index, + [](auto& oci) -> size_t& { return oci.second; }); + assert(index < conn_index_to_id.size()); + conn_index_to_id.erase(conn_index_to_id.begin() + index); } void LokiMQ::proxy_expire_idle_peers() { for (auto it = peers.begin(); it != peers.end(); ) { auto &info = it->second; - if (info.outgoing >= 0) { + if (info.outgoing()) { auto idle = info.last_activity - std::chrono::steady_clock::now(); if (idle <= info.idle_expiry) { ++it; continue; } - LMQ_LOG(info, "Closing outgoing connection to ", to_hex(it->first), ": idle timeout reached"); + LMQ_LOG(info, "Closing outgoing connection to ", it->first, ": idle timeout reached"); + proxy_close_connection(info.conn_index, CLOSE_LINGER); + it = peers.erase(it); } - - // Deliberately outside the above if: this *also* removes the peer from the map if if has - // neither an incoming or outgoing connection - it = proxy_close_outgoing(it); } } @@ -910,14 +961,15 @@ void LokiMQ::proxy_conn_cleanup() { auto now = std::chrono::steady_clock::now(); + // FIXME - check other outgoing connections to see if they died and if so purge them + // Check any pending outgoing connections for timeout for (auto it = pending_connects.begin(); it != pending_connects.end(); ) { auto& pc = *it; - if (std::get<1>(pc) < now) { - job([callback = std::move(std::get<3>(pc))] { callback("connection attempt timed out"); }); - int index = std::get<0>(pc); - it = pending_connects.erase(it); - proxy_close_remote(index, false /*linger*/); + if (std::get(pc) < now) { + job([cid = ConnectionID{std::get(pc)}, callback = std::move(std::get(pc))] { callback(cid, "connection attempt timed out"); }); + it = pending_connects.erase(it); // Don't let the below erase it (because it invalidates iterators) + proxy_close_connection(std::get(pc), CLOSE_LINGER); } else { ++it; } @@ -936,8 +988,24 @@ void LokiMQ::proxy_conn_cleanup() { } }; +void LokiMQ::listen_curve(std::string bind_addr, AllowFunc allow_connection) { + // TODO: there's no particular reason we can't start listening after starting up; just needs to + // be implemented. (But if we can start we'll probably also want to be able to stop, so it's + // more than just binding that needs implementing). + check_not_started(proxy_thread, "start listening"); + + bind.emplace_back(std::move(bind_addr), bind_data{true, std::move(allow_connection)}); +} + +void LokiMQ::listen_plain(std::string bind_addr, AllowFunc allow_connection) { + // TODO: As above. + check_not_started(proxy_thread, "start listening"); + + bind.emplace_back(std::move(bind_addr), bind_data{false, std::move(allow_connection)}); +} + void LokiMQ::proxy_loop() { - zmq::socket_t zap_auth{context, zmq::socket_type::rep}; + zap_auth.setsockopt(ZMQ_LINGER, 0); zap_auth.bind(ZMQ_ADDR_ZAP); @@ -965,38 +1033,38 @@ void LokiMQ::proxy_loop() { if (!workers.empty()) throw std::logic_error("Internal error: proxy thread started with active worker threads"); - add_pollitem(command); - add_pollitem(workers_socket); - add_pollitem(zap_auth); - assert(pollitems.size() == poll_internal_size); + for (size_t i = 0; i < bind.size(); i++) { + auto& b = bind[i].second; + zmq::socket_t listener{context, zmq::socket_type::router}; - if (!bind.empty()) { - // Set up the public tcp listener(s): - listener = {context, zmq::socket_type::router}; - listener.setsockopt(ZMQ_ZAP_DOMAIN, AUTH_DOMAIN_SN, sizeof(AUTH_DOMAIN_SN)-1); - listener.setsockopt(ZMQ_CURVE_SERVER, 1); - listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + std::string auth_domain = bt_serialize(i); + listener.setsockopt(ZMQ_ZAP_DOMAIN, auth_domain.c_str(), auth_domain.size()); + if (b.curve) { + listener.setsockopt(ZMQ_CURVE_SERVER, 1); + listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); + listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + } listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); listener.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); listener.setsockopt(ZMQ_ROUTER_HANDOVER, 1); listener.setsockopt(ZMQ_ROUTER_MANDATORY, 1); - for (const auto &b : bind) { - LMQ_LOG(info, "LokiMQ listening on ", b); - listener.bind(b); - } + listener.bind(bind[i].first); + LMQ_LOG(info, "LokiMQ listening on ", bind[i].first); - // Also add an internal connection to self so that calling code can avoid needing to - // special-case rare situations where we are supposed to talk to a quorum member that happens to - // be ourselves (which can happen, for example, with cross-quoum Blink communication) - // FIXME: not working - //listener.bind(SN_ADDR_SELF); - - add_pollitem(listener); + connections.push_back(std::move(listener)); + auto conn_id = next_conn_id++; + conn_index_to_id.push_back(conn_id); + incoming_conn_index[conn_id] = connections.size() - 1; + b.index = connections.size() - 1; } + pollitems_stale = true; - assert(pollitems.size() == poll_remote_offset); + // Also add an internal connection to self so that calling code can avoid needing to + // special-case rare situations where we are supposed to talk to a quorum member that happens to + // be ourselves (which can happen, for example, with cross-quoum Blink communication) + // FIXME: not working + //listener.bind(SN_ADDR_SELF); if (!timers) timers.reset(zmq_timers_new()); @@ -1016,7 +1084,7 @@ void LokiMQ::proxy_loop() { while (true) { std::chrono::milliseconds poll_timeout; if (max_workers == 0) { // Will be 0 only if we are quitting - if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.thread.joinable(); })) { + if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.worker_thread.joinable(); })) { // All the workers have finished, so we can finish shutting down return proxy_quit(); } @@ -1029,6 +1097,10 @@ void LokiMQ::proxy_loop() { proxy_skip_poll = false; else { LMQ_TRACE("polling for new messages"); + + if (pollitems_stale) + rebuild_pollitems(); + // We poll the control socket and worker socket for any incoming messages. If we have // available worker room then also poll incoming connections and outgoing connections // for messages to forward to a worker. Otherwise, we just look for a control message @@ -1052,7 +1124,7 @@ void LokiMQ::proxy_loop() { // Handle any zap authentication LMQ_TRACE("processing zap requests"); - process_zap_requests(zap_auth); + process_zap_requests(); // See if we can drain anything from the current queue before we potentially add to it // below. @@ -1064,7 +1136,7 @@ void LokiMQ::proxy_loop() { // We round-robin connections when pulling off pending messages one-by-one rather than // pulling off all messages from one connection before moving to the next; thus in cases of // contention we end up fairly distributing. - const int num_sockets = remotes.size() + listener.connected(); + const int num_sockets = connections.size(); std::queue queue_index; for (int i = 0; i < num_sockets; i++) queue_index.push(i); @@ -1072,7 +1144,7 @@ void LokiMQ::proxy_loop() { for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) { size_t i = queue_index.front(); queue_index.pop(); - auto &sock = listener.connected() ? (i == 0 ? listener : remotes[i - 1].second) : remotes[i].second; + auto& sock = connections[i]; if (!recv_message_parts(sock, parts, zmq::recv_flags::dontwait)) continue; @@ -1195,46 +1267,24 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { idle_workers.push_back(worker_id); } } else if (cmd == "QUITTING") { - workers[worker_id].thread.join(); + workers[worker_id].worker_thread.join(); LMQ_LOG(debug, "Worker ", route, " exited normally"); } else { LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'"); } } -decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(int conn_index, zmq::message_t& msg) { - bool is_outgoing_conn = !listener.connected() || conn_index > 0; - - std::string pubkey; - bool service_node = false; - if (!is_outgoing_conn) { - try { - extract_pubkey(msg, pubkey, service_node); - } catch (...) { - LMQ_LOG(error, "Internal error: message metadata not set or invalid; dropping message"); - throw std::out_of_range("message pubkey metadata invalid"); - } - } else { - pubkey = remotes[conn_index - listener.connected()].first; - } - - auto it = peers.find(pubkey); - if (it == peers.end()) - it = peers.emplace(std::move(pubkey), peer_info{}).first; - it->second.service_node |= service_node; - return it; -} - // Return true if we recognized/handled the builtin command (even if we reject it for whatever // reason) -bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector& parts) { +bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector& parts) { + bool outgoing = connections[conn_index].getsockopt(ZMQ_TYPE) == ZMQ_DEALER; + string_view route, cmd; - bool is_outgoing_conn = !listener.connected() || conn_index > 0; - if (parts.size() < (is_outgoing_conn ? 1 : 2)) { + if (parts.size() < (outgoing ? 1 : 2)) { LMQ_LOG(warn, "Received empty message; ignoring"); return true; } - if (is_outgoing_conn) { + if (outgoing) { cmd = view(parts[0]); } else { route = view(parts[0]); @@ -1243,7 +1293,7 @@ bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector& p LMQ_TRACE("Checking for builtins: ", cmd, " from ", peer_address(parts.back())); if (cmd == "REPLY") { - size_t tag_pos = (is_outgoing_conn ? 1 : 2); + size_t tag_pos = (outgoing ? 1 : 2); if (parts.size() <= tag_pos) { LMQ_LOG(warn, "Received REPLY without a reply tag; ignoring"); return true; @@ -1265,41 +1315,51 @@ bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector& p } return true; } else if (cmd == "HI") { - if (is_outgoing_conn) { + if (outgoing) { LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring"); return true; } LMQ_LOG(info, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO"); - send_routed_message(listener, std::string{route}, "HELLO"); + send_routed_message(connections[conn_index], std::string{route}, "HELLO"); return true; } else if (cmd == "HELLO") { - if (!is_outgoing_conn) { + if (!outgoing) { LMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring"); return true; } auto it = std::find_if(pending_connects.begin(), pending_connects.end(), - [&](auto& pc) { return std::get<0>(pc) == conn_index; }); + [&](auto& pc) { return std::get(pc) == conn_index; }); if (it == pending_connects.end()) { LMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring"); return true; } + auto& pc = *it; + auto pit = peers.find(std::get(pc)); + if (pit == peers.end()) { + LMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring"); + return true; + } + LMQ_LOG(info, "Got initial HELLO server response from ", peer_address(parts.back())); - size_t pksize = 32; - if (listener.connected()) - conn_index--; // convert to `remotes` index. - remotes[conn_index].first.resize(pksize); - remotes[conn_index].second.getsockopt(ZMQ_CURVE_SERVERKEY, &remotes[conn_index].first[0], &pksize); - auto &peer = peers[remotes[conn_index].first]; - peer.idle_expiry = 365 * 24h; - peer.outgoing = conn_index; - peer.service_node = false; - peer.activity(); - proxy_schedule_job([on_success=std::move(std::get<2>(*it)), pk=remotes[conn_index].first] { on_success(std::move(pk)); }); + proxy_schedule_job([on_success=std::move(std::get(pc)), + conn=conn_index_to_id[conn_index]] { + on_success(conn); + }); pending_connects.erase(it); return true; } else if (cmd == "BYE") { - auto pit = proxy_lookup_peer(conn_index, parts.front()); - proxy_close_outgoing(pit); + if (outgoing) { + std::string pk; + bool sn; + AuthLevel a; + std::tie(pk, sn, a) = extract_metadata(parts.back()); + ConnectionID conn = sn ? ConnectionID{std::move(pk)} : conn_index_to_id[conn_index]; + LMQ_LOG(info, "BYE command received; disconnecting from ", conn); + proxy_disconnect(conn, 1s); + } else { + LMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring"); + } + return true; } else if (cmd == "FORBIDDEN" || cmd == "NOT_A_SERVICE_NODE") { @@ -1315,7 +1375,7 @@ LokiMQ::run_info& LokiMQ::get_idle_worker() { workers.emplace_back(); auto& r = workers.back(); r.worker_id = id; - r.routing_id = "w" + std::to_string(id); + r.worker_routing_id = "w" + std::to_string(id); return r; } size_t id = idle_workers.back(); @@ -1339,8 +1399,8 @@ LokiMQ::run_info& LokiMQ::run_info::operator=(pending_command&& pending) { is_batch_job = false; cat = &pending.cat; command = std::move(pending.command); - pubkey = std::move(pending.pubkey); - service_node = pending.service_node; + conn = std::move(pending.conn); + conn_route = std::move(pending.conn_route); data_parts = std::move(pending.data_parts); callback = pending.callback; return *this; @@ -1354,10 +1414,10 @@ LokiMQ::run_info& LokiMQ::run_info::operator=(batch_job&& bj) { void LokiMQ::proxy_run_worker(run_info& run) { - if (!run.thread.joinable()) - run.thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id}; + if (!run.worker_thread.joinable()) + run.worker_thread = std::thread{&LokiMQ::worker_thread, this, run.worker_id}; else - send_routed_message(workers_socket, run.routing_id, "RUN"); + send_routed_message(workers_socket, run.worker_routing_id, "RUN"); } @@ -1386,26 +1446,60 @@ void LokiMQ::proxy_process_queue() { } void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& parts) { - auto pit = proxy_lookup_peer(conn_index, parts.back()); - string_view pubkey = pit->first; - auto& peer_info = pit->second; + bool outgoing = connections[conn_index].getsockopt(ZMQ_TYPE) == ZMQ_DEALER; - bool is_outgoing_conn = !listener.connected() || conn_index > 0; - size_t command_part_index = is_outgoing_conn ? 0 : 1; + peer_info tmp_peer; + tmp_peer.conn_index = conn_index; + if (!outgoing) tmp_peer.route = parts[0].to_string(); + peer_info* peer = nullptr; + if (outgoing) { + auto it = peers.find(conn_index_to_id[conn_index]); + if (it == peers.end()) { + LMQ_LOG(warn, "Internal error: connection index not found"); + return; + } + peer = &it->second; + } else { + std::tie(tmp_peer.pubkey, tmp_peer.service_node, tmp_peer.auth_level) = extract_metadata(parts.back()); + if (tmp_peer.service_node) { + // It's a service node so we should have a peer_info entry; see if we can find one with + // the same route, and if not, add one. + auto pr = peers.equal_range(tmp_peer.pubkey); + for (auto it = pr.first; it != pr.second; ++it) { + if (it->second.route == tmp_peer.route) { + peer = &it->second; + // Upgrade permissions in case we have something higher on the socket + peer->service_node |= tmp_peer.service_node; + if (tmp_peer.auth_level > peer->auth_level) + peer->auth_level = tmp_peer.auth_level; + break; + } + } + if (!peer) { + peer = &peers.emplace(ConnectionID{tmp_peer.pubkey}, std::move(tmp_peer))->second; + } + } else { + // Incoming, non-SN connection: we don't store a peer_info for this, so just use the + // temporary one + peer = &tmp_peer; + } + } + + size_t command_part_index = outgoing ? 0 : 1; std::string command = parts[command_part_index].to_string(); auto cat_call = get_command(command); if (!cat_call.first) { - if (is_outgoing_conn) - send_direct_message(remotes[conn_index - listener.connected()].second, "UNKNOWNCOMMAND", command); + if (outgoing) + send_direct_message(connections[conn_index], "UNKNOWNCOMMAND", command); else - send_routed_message(listener, std::string{pubkey}, "UNKNOWNCOMMAND", command); + send_routed_message(connections[conn_index], peer->route, "UNKNOWNCOMMAND", command); return; } auto& category = *cat_call.first; - if (!proxy_check_auth(pubkey, conn_index, peer_info, command, category, parts.back())) + if (!proxy_check_auth(conn_index, outgoing, *peer, command, category, parts.back())) return; // Steal any data message parts @@ -1424,7 +1518,8 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par } LMQ_LOG(debug, "No available free workers, queuing ", command, " for later"); - pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::string{pubkey}, peer_info.service_node); + ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey}; + pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::move(conn), tmp_peer.route); category.queued++; return; } @@ -1438,65 +1533,68 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par run.is_batch_job = false; run.cat = &category; run.command = std::move(command); - run.pubkey = std::string{pubkey}; - run.service_node = peer_info.service_node; + run.conn.pk = peer->pubkey; + if (peer->service_node) { + run.conn.id = ConnectionID::SN_ID; + run.conn_route.clear(); + } else { + run.conn.id = conn_index_to_id[conn_index].id; + if (outgoing) + run.conn_route.clear(); + else + run.conn_route = tmp_peer.route; + } + run.data_parts = std::move(data_parts); run.callback = cat_call.second; - if (is_outgoing_conn) { - peer_info.activity(); // outgoing connection activity, pump the activity timer - } else { - // incoming connection; the route is the first argument. Update the peer route info in case - // it has changed (e.g. new connection). - auto route = view(parts[0]); - if (string_view(peer_info.incoming) != route) - peer_info.incoming = std::string{route}; - } + if (outgoing) + peer->activity(); // outgoing connection activity, pump the activity timer - LMQ_TRACE("Forwarding incoming ", run.command, " from ", run.service_node ? "SN " : "non-SN ", - to_hex(run.pubkey), " @ ", peer_address(parts.back()), " to worker ", run.routing_id); + LMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts.back()), + " to worker ", run.worker_routing_id); proxy_run_worker(run); category.active_threads++; } -bool LokiMQ::proxy_check_auth(string_view pubkey, size_t conn_index, const peer_info& peer, const std::string& command, const category& cat, zmq::message_t& msg) { - bool is_outgoing_conn = !listener.connected() || conn_index > 0; +bool LokiMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info& peer, + const std::string& command, const category& cat, zmq::message_t& msg) { std::string reply; if (peer.auth_level < cat.access.auth) { - LMQ_LOG(warn, "Access denied to ", command, " for peer ", to_hex(pubkey), " @ ", peer_address(msg), + LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(msg), ": peer auth level ", to_string(peer.auth_level), " < ", to_string(cat.access.auth)); reply = "FORBIDDEN"; } else if (cat.access.local_sn && !local_service_node) { - LMQ_LOG(warn, "Access denied to ", command, " for peer ", to_hex(pubkey), " @ ", peer_address(msg), + LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(msg), ": that command is only available when this LokiMQ is running in service node mode"); reply = "NOT_A_SERVICE_NODE"; } else if (cat.access.remote_sn && !peer.service_node) { - LMQ_LOG(warn, "Access denied to ", command, " for peer ", to_hex(pubkey), " @ ", peer_address(msg), + LMQ_LOG(warn, "Access denied to ", command, " for peer [", to_hex(peer.pubkey), "]/", peer_address(msg), ": remote is not recognized as a service node"); // Disconnect: we don't think the remote is a SN, but it issued a command only SNs should be // issuing. Drop the connection; if the remote has something important to relay it will // reconnect, at which point we will reassess the SN status on the new incoming connection. - if (!is_outgoing_conn) - send_routed_message(listener, std::string{pubkey}, "BYE"); + if (outgoing) + proxy_disconnect(peer.service_node ? ConnectionID{peer.pubkey} : conn_index_to_id[conn_index], 1s); else - proxy_disconnect(std::string{pubkey}); + send_routed_message(connections[conn_index], peer.route, "BYE"); return false; } if (reply.empty()) return true; - if (is_outgoing_conn) - send_direct_message(remotes[conn_index - listener.connected()].second, std::move(reply), command); + if (outgoing) + send_direct_message(connections[conn_index], std::move(reply), command); else - send_routed_message(listener, std::string{pubkey}, std::move(reply), command); - return true; + send_routed_message(connections[conn_index], peer.route, std::move(reply), command); + return false; } -void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { +void LokiMQ::process_zap_requests() { for (std::vector frames; recv_message_parts(zap_auth, frames, zmq::recv_flags::dontwait); frames.clear()) { #ifndef NDEBUG if (log_level() >= LogLevel::trace) { @@ -1549,50 +1647,60 @@ void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { LMQ_LOG(error, "Bad ZAP authentication request: version != 1.0 or invalid ZAP message parts"); status_code = "500"; status_text = "Internal error: invalid auth request"; - } - else if (frames.size() != 7 || view(frames[5]) != "CURVE") { - LMQ_LOG(error, "Bad ZAP authentication request: invalid CURVE authentication request"); - status_code = "500"; - status_text = "Invalid CURVE authentication request\n"; - } - else if (frames[6].size() != 32) { - LMQ_LOG(error, "Bad ZAP authentication request: invalid request pubkey"); - status_code = "500"; - status_text = "Invalid public key size for CURVE authentication"; - } - else { - auto domain = view(frames[2]); - if (domain != AUTH_DOMAIN_SN) { - LMQ_LOG(error, "Bad ZAP authentication request: invalid auth domain '", domain, "'"); + } else { + auto auth_domain = view(frames[2]); + size_t bind_id = (size_t) -1; + try { + bind_id = bt_deserialize(view(frames[2])); + } catch (...) {} + + if (bind_id >= bind.size()) { + LMQ_LOG(error, "Bad ZAP authentication request: invalid auth domain '", auth_domain, "'"); status_code = "400"; - status_text = "Unknown authentication domain: " + std::string{domain}; + status_text = "Unknown authentication domain: " + std::string{auth_domain}; + } else if (bind[bind_id].second.curve + ? !(frames.size() == 7 && view(frames[5]) == "CURVE") + : !(frames.size() == 6 && view(frames[5]) == "NULL")) { + LMQ_LOG(error, "Bad ZAP authentication request: invalid ", + bind[bind_id].second.curve ? "CURVE" : "NULL", " authentication request"); + status_code = "500"; + status_text = "Invalid authentication request mechanism"; + } else if (bind[bind_id].second.curve && frames[6].size() != 32) { + LMQ_LOG(error, "Bad ZAP authentication request: invalid request pubkey"); + status_code = "500"; + status_text = "Invalid public key size for CURVE authentication"; } else { - auto ip = string(frames[3]), pubkey = string(frames[6]); - auto result = allow_connection(ip, pubkey); + auto ip = view(frames[3]); + string_view pubkey; + if (bind[bind_id].second.curve) + pubkey = view(frames[6]); + auto result = bind[bind_id].second.allow(ip, pubkey); bool sn = result.remote_sn; auto& user_id = response_vals[4]; - user_id.reserve(64); - to_hex(pubkey.begin(), pubkey.end(), std::back_inserter(user_id)); + if (bind[bind_id].second.curve) { + user_id.reserve(64); + to_hex(pubkey.begin(), pubkey.end(), std::back_inserter(user_id)); + } if (result.auth <= AuthLevel::denied || result.auth > AuthLevel::admin) { - LMQ_LOG(info, "Access denied for incoming ", (sn ? "service node" : "non-SN client"), - " connection from ", user_id, " at ", ip, " with initial auth level ", to_string(result.auth)); + LMQ_LOG(info, "Access denied for incoming ", view(frames[5]), (sn ? " service node" : " client"), + " connection from ", !user_id.empty() ? user_id + " at " : ""s, ip, + " with initial auth level ", to_string(result.auth)); status_code = "400"; status_text = "Access denied"; user_id.clear(); - } - LMQ_LOG(info, "Accepted incoming ", (sn ? "service node" : "non-SN client"), - " connection with authentication level ", to_string(result.auth), - " from ", user_id, " at ", ip); + } else { + LMQ_LOG(info, "Accepted incoming ", view(frames[5]), (sn ? " service node" : " client"), + " connection with authentication level ", to_string(result.auth), + " from ", !user_id.empty() ? user_id + " at " : ""s, ip); - auto& metadata = response_vals[5]; - if (result.remote_sn) - metadata += zmtp_metadata("X-SN", "1"); - if (result.auth != AuthLevel::none) + auto& metadata = response_vals[5]; + metadata += zmtp_metadata("X-SN", result.remote_sn ? "1" : "0"); metadata += zmtp_metadata("X-AuthLevel", to_string(result.auth)); - status_code = "200"; - status_text = ""; + status_code = "200"; + status_text = ""; + } } } @@ -1612,34 +1720,51 @@ LokiMQ::~LokiMQ() { LMQ_LOG(info, "LokiMQ proxy thread has stopped"); } -void LokiMQ::connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive, string_view hint) { +ConnectionID LokiMQ::connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive, string_view hint) { check_started(proxy_thread, "connect"); detail::send_control(get_control_socket(), "CONNECT_SN", bt_serialize({{"pubkey",pubkey}, {"keep-alive",keep_alive.count()}, {"hint",hint}})); + + return pubkey; } -void LokiMQ::connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure, - string_view pubkey, std::chrono::milliseconds timeout) { +ConnectionID LokiMQ::connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure, + string_view pubkey, AuthLevel auth_level, std::chrono::milliseconds timeout) { if (!proxy_thread.joinable()) LMQ_LOG(warn, "connect_remote() called before start(); this won't take effect until start() is called"); - LMQ_TRACE("telling proxy to connect to ", remote, ", expecting pubkey [", to_hex(pubkey), "]"); + if (remote.size() < 7 || !(remote.substr(0, 6) == "tcp://" || remote.substr(0, 6) == "ipc://" /* unix domain sockets */)) + throw std::runtime_error("Invalid connect_remote: remote address '" + std::string{remote} + "' is not a valid or supported zmq connect string"); + + auto id = next_conn_id++; + LMQ_TRACE("telling proxy to connect to ", remote, ", id ", id, + pubkey.empty() ? "using NULL auth" : ", using CURVE with remote pubkey [" + to_hex(pubkey) + "]"); detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize({ - {"remote", remote}, - {"pubkey", pubkey}, - {"timeout", timeout.count()}, + {"auth", static_cast>(auth_level)}, + {"conn_id", id}, {"connect", reinterpret_cast(new ConnectSuccess{std::move(on_connect)})}, {"failure", reinterpret_cast(new ConnectFailure{std::move(on_failure)})}, + {"pubkey", pubkey}, + {"remote", remote}, + {"timeout", timeout.count()}, })); + + return id; } void LokiMQ::proxy_connect_remote(bt_dict_consumer data) { + AuthLevel auth_level = AuthLevel::none; + long long conn_id = -1; ConnectSuccess on_connect; ConnectFailure on_failure; std::string remote; std::string remote_pubkey; std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT; + if (data.skip_until("auth_level")) + auth_level = static_cast(data.consume_integer>()); + if (data.skip_until("conn_id")) + conn_id = data.consume_integer(); if (data.skip_until("connect")) { auto* ptr = reinterpret_cast(data.consume_integer()); on_connect = std::move(*ptr); @@ -1654,38 +1779,85 @@ void LokiMQ::proxy_connect_remote(bt_dict_consumer data) { remote_pubkey = data.consume_string(); assert(remote_pubkey.size() == 32 || remote_pubkey.empty()); } - if (data.skip_until("remote")) { + if (data.skip_until("remote")) remote = data.consume_string(); - } if (data.skip_until("timeout")) timeout = std::chrono::milliseconds{data.consume_integer()}; - if (remote.empty()) - throw std::runtime_error("Internal error: CONNECT_REMOTE proxy command missing required 'remote' value"); + if (conn_id == -1 || remote.empty()) + throw std::runtime_error("Internal error: CONNECT_REMOTE proxy command missing required 'conn_id' and/or 'remote' value"); - LMQ_LOG(info, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (any pubkey)" : " expecting pubkey " + to_hex(remote_pubkey)); + LMQ_LOG(info, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (NULL auth)" : " via CURVE expecting pubkey " + to_hex(remote_pubkey)); + + assert(conn_index_to_id.size() == connections.size()); zmq::socket_t sock{context, zmq::socket_type::dealer}; try { setup_outgoing_socket(sock, remote_pubkey); sock.connect(remote); } catch (const zmq::error_t &e) { - proxy_schedule_job([on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { on_failure(std::move(what)); }); + proxy_schedule_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { + on_failure(conn_id, std::move(what)); + }); return; } - LMQ_LOG(debug, "Opened new zmq socket to ", remote, ", sending HI"); - send_direct_message(sock, "HI"); - add_pollitem(sock); - remotes.emplace_back("", std::move(sock)); - pending_connects.emplace_back(remotes.size()-1, std::chrono::steady_clock::now() + timeout, + connections.push_back(std::move(sock)); + LMQ_LOG(debug, "Opened new zmq socket to ", remote, ", conn_id ", conn_id, "; sending HI"); + send_direct_message(connections.back(), "HI"); + pending_connects.emplace_back(connections.size()-1, conn_id, std::chrono::steady_clock::now() + timeout, std::move(on_connect), std::move(on_failure)); + peer_info peer; + peer.pubkey = std::move(remote_pubkey); + peer.service_node = false; + peer.auth_level = auth_level; + peer.conn_index = connections.size() - 1; + ConnectionID conn{conn_id, peer.pubkey}; + conn_index_to_id.push_back(conn); + assert(connections.size() == conn_index_to_id.size()); + peer.idle_expiry = 24h * 10 * 365; // "forever" + peer.activity(); + peers.emplace(std::move(conn), std::move(peer)); } -void LokiMQ::disconnect_remote(string_view id, std::chrono::milliseconds linger) { - (void)id, (void)linger; +void LokiMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) { + detail::send_control(get_control_socket(), "DISCONNECT", bt_serialize({ + {"conn_id", id.id}, + {"linger_ms", linger.count()}, + {"pubkey", id.pk}, + })); } +void LokiMQ::proxy_disconnect(bt_dict_consumer data) { + ConnectionID connid{-1}; + std::chrono::milliseconds linger = 1s; + + if (data.skip_until("conn_id")) + connid.id = data.consume_integer(); + if (data.skip_until("linger_ms")) + linger = std::chrono::milliseconds(data.consume_integer()); + if (data.skip_until("pubkey")) + connid.pk = data.consume_string(); + + if (connid.sn() && connid.pk.size() != 32) + throw std::runtime_error("Error: invalid disconnect of SN without a valid pubkey"); + + proxy_disconnect(std::move(connid), linger); +} +void LokiMQ::proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger) { + LMQ_TRACE("Disconnecting outgoing connection to ", conn); + auto pr = peers.equal_range(conn); + for (auto it = pr.first; it != pr.second; ++it) { + auto& peer = it->second; + if (peer.outgoing()) { + LMQ_LOG(info, "Closing outgoing connection to ", conn); + proxy_close_connection(peer.conn_index, linger); + peers.erase(it); + return; + } + } + LMQ_LOG(warn, "Failed to disconnect ", conn, ": no such outgoing connection"); +} void LokiMQ::job(std::function f) { auto* b = new Batch; diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index e1d38c4..b8f341f 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -76,9 +76,8 @@ struct Access { bool local_sn = false; }; -/// Return type of the AllowFunc: this determines whether we allow the connection at all, and if -/// so, sets the initial authentication level and tells LokiMQ whether the other hand is an -/// active SN. +/// Return type of the AllowFunc: this determines whether we allow the connection at all, and if so, +/// sets the initial authentication level and tells LokiMQ whether the other end is an active SN. struct Allow { AuthLevel auth = AuthLevel::none; bool remote_sn = false; @@ -86,6 +85,69 @@ struct Allow { class LokiMQ; +/// Opaque data structure representing a connection which supports ==, !=, < and std::hash. For +/// connections to service node this is the service node pubkey (and you can pass a 32-byte string +/// anywhere a ConnectionID is called for). For non-SN remote connections you need to keep a copy +/// of the ConnectionID returned by connect_remote(). +struct ConnectionID { + ConnectionID(std::string pubkey_) : id{SN_ID}, pk{std::move(pubkey_)} { + if (pk.size() != 32) + throw std::runtime_error{"Invalid pubkey: expected 32 bytes"}; + } + ConnectionID(string_view pubkey_) : ConnectionID(std::string{pubkey_}) {} + ConnectionID(const ConnectionID&) = default; + ConnectionID(ConnectionID&&) = default; + ConnectionID& operator=(const ConnectionID&) = default; + ConnectionID& operator=(ConnectionID&&) = default; + + // Two ConnectionIDs are equal if they are both SNs and have matching pubkeys, or they are both + // not SNs and have matching internal IDs. (Pubkeys do not have to match for non-SNs). + bool operator==(const ConnectionID &o) const { + if (id == SN_ID && o.id == SN_ID) + return pk == o.pk; + return id == o.id; + } + bool operator!=(const ConnectionID &o) const { return !(*this == o); } + bool operator<(const ConnectionID &o) const { + if (id == SN_ID && o.id == SN_ID) + return pk < o.pk; + return id < o.id; + } + // Returns true if this ConnectionID represents a SN connection + bool sn() const { return id == SN_ID; } + + // Returns this connection's pubkey, if any. (Note that it is possible to have a pubkey and not + // be a SN when connecting to secure remotes: having a non-empty pubkey does not imply that + // `sn()` is true). + const std::string& pubkey() const { return pk; } + // Default construction; creates a ConnectionID with an invalid internal ID that will not match + // an actual connection. + ConnectionID() : ConnectionID(0) {} +private: + ConnectionID(long long id) : id{id} {} + ConnectionID(long long id, std::string pubkey) : id{id}, pk{std::move(pubkey)} {} + + constexpr static long long SN_ID = -1; + long long id = 0; + std::string pk; + friend class LokiMQ; + friend class std::hash; + + friend std::ostream& operator<<(std::ostream& o, const ConnectionID& conn); +}; + +} // namespace lokimq +namespace std { + // Need this here because we stick it in an unordered_map below. + template <> struct hash { + size_t operator()(const lokimq::ConnectionID &c) const { + return c.sn() ? std::hash{}(c.pk) : + std::hash{}(c.id); + } + }; +} // namespace std +namespace lokimq { + /// Encapsulates an incoming message from a remote connection with message details plus extra /// info need to send a reply back through the proxy thread via the `reply()` method. Note that /// this object gets reused: callbacks should use but not store any reference beyond the callback. @@ -93,13 +155,12 @@ class Message { public: LokiMQ& lokimq; ///< The owning LokiMQ object std::vector data; ///< The provided command data parts, if any. - string_view id; ///< The remote's unique, opaque id for routing. - string_view pubkey; ///< The remote's pubkey (32 bytes) - bool service_node; ///< True if the pubkey is an active SN (note that this is only checked on initial connection, not every received message) + ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status. + std::string route; ///< The return route for a reply (if the message was on an incoming conn) std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`. /// Constructor - Message(LokiMQ& lmq) : lokimq{lmq} {} + Message(LokiMQ& lmq, ConnectionID cid) : lokimq{lmq}, conn{std::move(cid)} {} // Non-copyable Message(const Message&) = delete; @@ -119,9 +180,18 @@ public: /// Sends a reply to a request. This takes no command: the command is always the built-in /// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other - /// arguments are as in `send_back()`. + /// arguments are as in `send_back()`. You should only send one reply for a command expecting + /// replies, though this is not enforced: attempting to send multiple replies will simply be + /// dropped when received by the remote. (Note, however, that it is possible to send multiple + /// messages -- e.g. you could send a reply and then also call send_back() and/or send_request() + /// to send more requests back to the sender). template void send_reply(Args&&... args); + + /// Sends a request back to whomever sent this message. This is effectively a wrapper around + /// lmq.request() that takes care of setting up the recipient arguments. + template + void send_request(string_view cmd, ReplyCallback&& callback, Args&&... args); }; // Forward declarations; see batch.h @@ -150,6 +220,7 @@ static constexpr size_t MAX_CATEGORY_LENGTH = 50; /// Maximum length of a command static constexpr size_t MAX_COMMAND_LENGTH = 200; + /** * Class that handles LokiMQ listeners, connections, proxying, and workers. An application * typically has just one instance of this class. @@ -172,10 +243,6 @@ private: /// True if *this* node is running in service node mode (whether or not actually active) bool local_service_node = false; - /// Addresses on which to listen, or empty if we only establish outgoing connections but aren't - /// listening. - std::vector bind; - /// The thread in which most of the intermediate work happens (handling external connections /// and proxying requests between them to worker threads) std::thread proxy_thread; @@ -200,16 +267,18 @@ public: /// connect to us and to set its initial authentication level. /// /// @param ip - the ip address of the incoming connection - /// @param pubkey - the x25519 pubkey of the connecting client (32 byte string) + /// @param pubkey - the x25519 pubkey of the connecting client (32 byte string). Note that this + /// will only be non-empty for incoming connections on `listen_curve` sockets; `listen_plain` + /// sockets do not have a pubkey. /// /// @returns an `AuthLevel` enum value indicating the default auth level for the incoming /// connection, or AuthLevel::denied if the connection should be refused. - using AllowFunc = std::function; + using AllowFunc = std::function; /// Callback that is invoked when we need to send a "strong" message to a SN that we aren't /// already connected to and need to establish a connection. This callback returns the ZMQ /// connection string we should use which is typically a string such as `tcp://1.2.3.4:5678`. - using SNRemoteAddress = std::function; + using SNRemoteAddress = std::function; /// The callback type for registered commands. using CommandCallback = std::function; @@ -225,9 +294,9 @@ public: using Logger = std::function; /// Callback for the success case of connect_remote() - using ConnectSuccess = std::function; + using ConnectSuccess = std::function; /// Callback for the failure case of connect_remote() - using ConnectFailure = std::function; + using ConnectFailure = std::function; /// Explicitly non-copyable, non-movable because most things here aren't copyable, and a few /// things aren't movable, either. If you need to pass the LokiMQ instance around, wrap it @@ -253,10 +322,7 @@ public: private: /// The lookup function that tells us where to connect to a peer, or empty if not found. - SNRemoteAddress peer_lookup; - - /// Callback to see whether the incoming connection is allowed - AllowFunc allow_connection; + SNRemoteAddress sn_lookup; /// The log level; this is atomic but we use relaxed order to set and access it (so changing it /// might not be instantly visible on all threads, but that's okay). @@ -272,56 +338,80 @@ private: /////////////////////////////////////////////////////////////////////////////////// /// NB: The following are all the domain of the proxy thread (once it is started)! - /// Addresses to bind to in `start()` - std::vector bind_addresses; + /// The socket we listen on for handling ZAP authentication requests (the other end is internal + /// to zmq which sends requests to us as needed). + zmq::socket_t zap_auth{context, zmq::socket_type::rep}; - /// Our listening ROUTER socket for incoming connections (will be left unconnected if not - /// listening). - zmq::socket_t listener; + struct bind_data { + bool curve; + size_t index; + AllowFunc allow; + bind_data(bool curve, AllowFunc allow) + : curve{curve}, index{0}, allow{std::move(allow)} {} + }; - /// Info about a peer's established connection to us. Note that "established" means both + /// Addresses on which we are listening (or, before start(), on which we will listen). + std::vector> bind; + + /// Info about a peer's established connection with us. Note that "established" means both /// connected and authenticated. struct peer_info { - /// Pubkey of the remote; can be empty (especially before handshake) but will only be set if - /// the pubkey has been verified. + /// Pubkey of the remote, if this connection is a curve25519 connection; empty otherwise. std::string pubkey; - /// True if we've authenticated this peer as a service node. + /// True if we've authenticated this peer as a service node. This gets set on incoming + /// messages when we check the remote's pubkey, and immediately on outgoing connections to + /// SNs (since we know their pubkey -- we'll fail to connect if it doesn't match). bool service_node = false; - /// The auth level of this peer + /// The auth level of this peer, as returned by the AllowFunc for incoming connections or + /// specified during outgoing connections. AuthLevel auth_level = AuthLevel::none; - /// Will be set to a non-empty routing prefix if if we have (or at least recently had) an - /// established incoming connection with this peer. Will be empty if there is no incoming - /// connection. - std::string incoming; + /// The actual internal socket index through which this connection is established + size_t conn_index; - /// The index in `remotes` if we have an established outgoing connection to this peer, -1 if - /// we have no outgoing connection to this peer. - int outgoing = -1; + /// Will be set to a non-empty routing prefix *if* one is necessary on the connection. This + /// is used only for SN peers (non-SN incoming connections don't have a peer_info record, + /// and outgoing connections don't have a route). + std::string route; - /// The last time we sent or received a message (or had some other relevant activity) with - /// this peer. Used for closing outgoing connections that have reached an inactivity expiry - /// time. + /// Returns true if this is an outgoing connection. (This is simply an alias for + /// route.empty() -- outgoing connections never have a route, incoming connections always + /// do). + bool outgoing() const { return route.empty(); } + + /// The last time we sent or received a message (or had some other relevant activity) on + /// this connection. Used for closing outgoing connections that have reached an inactivity + /// expiry time (closing inactive conns for incoming connections is done by the other end). std::chrono::steady_clock::time_point last_activity; /// Updates last_activity to the current time void activity() { last_activity = std::chrono::steady_clock::now(); } - /// After more than this much inactivity we will close an idle connection + /// After more than this much inactivity we will close an idle (outgoing) connection std::chrono::milliseconds idle_expiry; }; - /// Currently peer connections: id -> peer_info. id == pubkey for incoming and outgoing SN - /// connections; random string for outgoing direct connections. - std::unordered_map peers; + /// Currently peer connections: id -> peer_info. The ID is as returned by connect_remote or a + /// SN pubkey string. + std::unordered_multimap peers; + + /// Maps connection indices (which can change) to ConnectionIDs (which are permanent). + std::vector conn_index_to_id; + + /// Maps listening socket ConnectionIDs to connection index values (these don't have peers + /// entries) + std::unordered_map incoming_conn_index; + + /// The next ConnectionID value we should use (for non-SN connections). + std::atomic next_conn_id{1}; /// Remotes we are still trying to connect to (via connect_remote(), not connect_sn()); when /// we pass handshaking we move them out of here and (if set) trigger the on_connect callback. /// Unlike regular node-to-node peers, these have an extra "HI"/"HELLO" sequence that we used /// before we consider ourselves connected to the remote. - std::vector> pending_connects; + std::vector> pending_connects; /// Pending requests that have been sent out but not yet received a matching "REPLY". The value /// is the timeout timestamp. @@ -329,26 +419,22 @@ private: pending_requests; /// different polling sockets the proxy handler polls: this always contains some internal - /// sockets for inter-thread communication followed by listener socket and a pollitem for every - /// (outgoing) remote socket in `remotes`. This must be in a sequential vector because of zmq - /// requirements (otherwise it would be far nicer to not have to synchronize the two vectors). + /// sockets for inter-thread communication followed by a pollitem for every connection (both + /// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever + /// `pollitems_stale` is set to true. std::vector pollitems; - /// Properly adds a socket to poll for input to pollitems - void add_pollitem(zmq::socket_t& sock); + /// If set then rebuild pollitems before the next poll (set when establishing new connections or + /// closing existing ones). + bool pollitems_stale = true; - /// The number of internal sockets in `pollitems` - static constexpr size_t poll_internal_size = 3; + /// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets. + void rebuild_pollitems(); - /// The pollitems location corresponding to `remotes[0]`. - const size_t poll_remote_offset; // will be poll_internal_size + 1 for a full listener (the +1 is the listening socket); poll_internal_size for a remote-only - - /// The outgoing remote connections we currently have open along with the remote pubkeys. Each + /// The connections to/from remotes we currently have open, both listening and outgoing. Each /// element [i] here corresponds to an the pollitem_t at pollitems[i+1+poll_internal_size]. /// (Ideally we'd use one structure, but zmq requires the pollitems be in contiguous storage). - /// For new connections established via connect_remote the pubkey will be empty until we - /// do the HI/HELLO handshake over the socket. - std::vector> remotes; + std::vector connections; /// Socket we listen on to receive control messages in the proxy thread. Each thread has its own /// internal "control" connection (returned by `get_control_socket()`) to this socket used to @@ -412,7 +498,7 @@ private: /// be done in the proxy thread anyway (if we forwarded to a worker the worker would just have /// to send an instruction back to the proxy to do it). Returns true if one was handled, false /// to continue with sending to a worker. - bool proxy_handle_builtin(int conn_index, std::vector& parts); + bool proxy_handle_builtin(size_t conn_index, std::vector& parts); struct run_info; /// Gets an idle worker's run_info and removes the worker from the idle worker list. If there @@ -434,24 +520,45 @@ private: /// gets called after all works have done so. void proxy_quit(); - // Sets the various properties on an outgoing socket prior to connection. + // Sets the various properties for a listening socket prior to binding. If curve is true then + // the socket is set up using the keys and incoming connections must already know the pubkey to + // establish a connection; otherwise the connection is plaintext without authentication. + void setup_listening_socket(zmq::socket_t& socket, bool curve); + + // Sets the various properties on an outgoing socket prior to connection. If remote_pubkey is + // provided then the connection will be curve25519 encrypted and authenticate; otherwise it will + // be unencrypted and unauthenticated. Note that the remote end must be in the same mode (i.e. + // either accepting curve connections, or not accepting curve). void setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey = {}); /// Common connection implementation used by proxy_connect/proxy_send. Returns the socket /// and, if a routing prefix is needed, the required prefix (or an empty string if not needed). /// For an optional connect that fail, returns nullptr for the socket. - std::pair proxy_connect_sn(string_view pubkey, string_view connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive); + /// + /// @param pubkey the pubkey to connect to + /// @param connect_hint if we need a new connection and this is non-empty then we *may* use it + /// instead of doing a call to `sn_lookup()`. + /// @param optional if we don't already have a connection then don't establish a new one + /// @param incoming_only only relay this if we have an established incoming connection from the + /// given SN, otherwise don't connect (like `optional`) + /// @param keep_alive the keep alive for the connection, if we establish a new outgoing + /// connection. If we already have an outgoing connection then its keep-alive gets increased to + /// this if currently less than this. + std::pair proxy_connect_sn(string_view pubkey, string_view connect_hint, + bool optional, bool incoming_only, std::chrono::milliseconds keep_alive); - /// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could be - /// existing or a new one). - std::pair proxy_connect_sn(bt_dict&& data); + /// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could + /// be existing or a new one). This basically just unpacks arguments and passes them on to + /// proxy_connect_sn(). + std::pair proxy_connect_sn(bt_dict_consumer data); /// Opens a new connection to a remote, with callbacks. This is the proxy-side implementation /// of the `connect_remote()` call. void proxy_connect_remote(bt_dict_consumer data); - /// Called to disconnect our remote connection to the given pubkey (if we have one). - void proxy_disconnect(const std::string& pubkey); + /// Called to disconnect our remote connection to the given id (if we have one). + void proxy_disconnect(bt_dict_consumer data); + void proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger); /// SEND command. Does a connect first, if necessary. void proxy_send(bt_dict_consumer data); @@ -480,10 +587,9 @@ private: /// Same, but deserialized void proxy_timer(std::function job, std::chrono::milliseconds interval, bool squelch); - /// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this is called with the - /// zap auth socket to do non-blocking processing of any waiting authentication requests waiting - /// on it to verify whether the connection is from a valid/allowed SN. - void process_zap_requests(zmq::socket_t& zap_auth); + /// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this does non-blocking + /// processing of any waiting authentication requests for new incoming connections. + void process_zap_requests(); /// Handles a control message from some outer thread to the proxy void proxy_control_message(std::vector& parts); @@ -493,7 +599,7 @@ private: void proxy_expire_idle_peers(); /// Helper method to actually close a remote connection and update the stuff that needs updating. - void proxy_close_remote(int removed, bool linger = true); + void proxy_close_connection(size_t removed, std::chrono::milliseconds linger); /// Closes an outgoing connection immediately, updates internal variables appropriately. /// Returns the next iterator (the original may or may not be removed from peers, depending on @@ -527,7 +633,7 @@ private: /// Checks a peer's authentication level. Returns true if allowed, warns and returns false if /// not. - bool proxy_check_auth(string_view pubkey, size_t conn_index, const peer_info& peer, + bool proxy_check_auth(size_t conn_index, bool outgoing, const peer_info& peer, const std::string& command, const category& cat, zmq::message_t& msg); /// Details for a pending command; such a command already has authenticated access and is just @@ -537,12 +643,13 @@ private: std::string command; std::vector data_parts; const std::pair* callback; - std::string pubkey; - std::string id; - bool service_node; + ConnectionID conn; + std::string conn_route; - pending_command(category& cat, std::string command, std::vector data_parts, const std::pair* callback, std::string pubkey, bool service_node) - : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, callback{callback}, pubkey{std::move(pubkey)}, service_node{service_node} {} + pending_command(category& cat, std::string command, std::vector data_parts, + const std::pair* callback, ConnectionID conn, std::string conn_route) + : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, + callback{callback}, conn{std::move(conn)}, conn_route{std::move(conn_route)} {} }; std::list pending_commands; @@ -560,8 +667,8 @@ private: // these shouldn't be accessed and likely contain stale data). category *cat; std::string command; - std::string pubkey; - bool service_node = false; + ConnectionID conn; // The connection (or SN pubkey) to reply on/to. + std::string conn_route; // if non-empty this is the reply routing prefix (for incoming connections) std::vector data_parts; // If is_batch_job true then these are set (if is_batch_job false then don't access these!): @@ -573,9 +680,9 @@ private: }; // These belong to the proxy thread and must not be accessed by a worker: - std::thread thread; + std::thread worker_thread; size_t worker_id; // The index in `workers` - std::string routing_id; // "w123" where 123 == worker_id + std::string worker_routing_id; // "w123" where 123 == worker_id /// Loads the run info with a pending command run_info& operator=(pending_command&& pending); @@ -606,15 +713,13 @@ public: * *capable* of being a service node, whether or not we are currently actively). If specified * as true then the pubkey and privkey values must not be empty. * - * @param bind list of addresses to bind to. Can be any string zmq supports; typically a tcp - * IP/port combination such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678". Can be empty to not - * listen at all. - * - * @param peer_lookup function that takes a pubkey key (32-byte binary string) and returns a + * @param sn_lookup function that takes a pubkey key (32-byte binary string) and returns a * connection string such as "tcp://1.2.3.4:23456" to which a connection should be established * to reach that service node. Note that this function is only called if there is no existing * connection to that service node, and that the function is never called for a connection to - * self (that uses an internal connection instead). Should return empty for not found. + * self (that uses an internal connection instead). Also note that the service node must be + * listening in curve25519 mode (otherwise we couldn't verify its authenticity). Should return + * empty for not found or if SN lookups are not supported. * * @param allow_incoming is a callback that LokiMQ can use to determine whether an incoming * connection should be allowed at all and, if so, whether the connection is from a known @@ -629,21 +734,16 @@ public: LokiMQ( std::string pubkey, std::string privkey, bool service_node, - std::vector bind, - SNRemoteAddress peer_lookup, - AllowFunc allow_connection, + SNRemoteAddress sn_lookup, Logger logger = [](LogLevel, const char*, int, std::string) { }); /** - * Simplified LokiMQ constructor for a client. This does not bind, generates ephemeral keys, - * and doesn't have peer_lookup capabilities, and treats all remotes as "basic", non-service - * node connections (for command authenication purposes). + * Simplified LokiMQ constructor for a simple listener without any SN connection/authentication + * capabilities. This treats all remotes as "basic", non-service node connections for command + * authentication purposes. */ explicit LokiMQ(Logger logger = [](LogLevel, const char*, int, std::string) { }) - : LokiMQ("", "", false, {}, - [](const auto&) { return std::string{}; }, - [](string_view, string_view) { return Allow{AuthLevel::basic}; }, - std::move(logger)) {} + : LokiMQ("", "", false, [](auto) { return ""s; /*no peer lookups*/ }, std::move(logger)) {} /** * Destructor; instructs the proxy to quit. The proxy tells all workers to quit, waits for them @@ -757,6 +857,33 @@ public: */ void start(); + /** Start listening on the given bind address using curve authentication/encryption. Incoming + * connections will only be allowed from clients that already have the server's pubkey, and + * will be encrypted. `allow_connection` is invoked for any incoming connections on this + * address to determine the incoming remote's access and authentication level. + * + * @param bind address - can be any string zmq supports; typically a tcp IP/port combination + * such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678". + * + * @param allow_connection function to call to determine whether to allow the connection and, if + * so, the authentication level it receives. If omitted the default returns non-service node, + * AuthLevel::none access. + */ + void listen_curve(std::string bind, AllowFunc allow_connection = [](auto, auto) { return Allow{AuthLevel::none, false}; }); + + /** Start listening on the given bind address in unauthenticated plain text mode. Incoming + * connections can come from anywhere. `allow_connection` is invoked for any incoming + * connections on this address to determine the incoming remote's access and authentication + * level. Note that `allow_connection` here will be called with an empty pubkey. + * + * @param bind address - can be any string zmq supports; typically a tcp IP/port combination + * such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678". + * + * @param allow_connection function to call to determine whether to allow the connection and, if + * so, the authentication level it receives. + */ + void listen_plain(std::string bind, AllowFunc allow_connection); + /** * Try to initiate a connection to the given SN in anticipation of needing a connection in the * future. If a connection is already established, the connection's idle timer will be reset @@ -772,89 +899,120 @@ public: * @param keep_alive - the connection will be kept alive if there was valid activity within * the past `keep_alive` milliseconds. If an outgoing connection already * exists, the longer of the existing and the given keep alive is used. - * Note that the default applied here is much longer than the default for an - * implicit connect() by calling send() directly. + * (Note that the default applied here is much longer than the default for an + * implicit connect() by calling send() directly.) * @param hint - if non-empty and a new outgoing connection needs to be made this hint value * may be used instead of calling the lookup function. (Note that there is no * guarantee that the hint will be used; it is only usefully specified if the - * connection location has already been incidentally determined). + * connection address has already been incidentally determined). + * + * @returns a ConnectionID that identifies an connection with the given SN. Typically you + * *don't* need to worry about this (and can just discard it): you can always simply pass the + * pubkey as a string wherever a ConnectionID is called. */ - void connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive = 5min, string_view hint = {}); + ConnectionID connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive = 5min, string_view hint = {}); /** * Establish a connection to the given remote with callbacks invoked on a successful or failed - * connection. The success callback gives you the pubkey of the remote, which can then be used - * to send commands to the remote (via `send()`). is generally intended for cases where the remote is - * being treated as the "server" and the local connection as a "client"; for connections between - * peers (i.e. between SNs) you generally want connect_sn() instead. If pubkey is non-empty - * then the remote must have that pubkey; if empty then any pubkey is allowed. + * connection. Returns a ConnectionID associated with the connection being attempted. It is + * possible to send to the remote before the successful callback is invoked, but there is no + * guarantee that the messages will be delivered (e.g. if the connection ultimately fails). * - * Unlike `connect_sn`, the connection established here will be kept open - * indefinitely (until you call disconnect). + * For connections to a service node you generally want connect_sn() instead (which verifies + * that it is talking to the SN and encrypts the connection). + * + * Unlike `connect_sn`, the connection established here will be kept open indefinitely (until + * you call disconnect). * * The `on_connect` and `on_failure` callbacks are invoked when a connection has been * established or failed to establish. * * @param remote the remote connection address, such as `tcp://localhost:1234`. - * @param on_connect called with the identifier and the remote's pubkey after the connection has - * been established and handshaked. - * @param on_failure called with a failure message if we fail to connect. - * @param pubkey the required remote pubkey (empty to accept any). + * @param on_connect called with the identifier after the connection has been established. + * @param on_failure called with the identifier and failure message if we fail to connect. + * @param pubkey if non-empty then connect securely (using curve encryption) and verify that the + * remote's pubkey equals the given value. Specifying this is similar to using connect_sn() + * except that we do not treat the remote as a SN for command authorization purposes. + * @param auth_level determines the authentication level of the remote for issuing commands to + * us. The default is `AuthLevel::none`. * @param timeout how long to try before aborting the connection attempt and calling the * on_failure callback. Note that the connection can fail for various reasons before the * timeout. + * + * @param returns ConnectionID that uniquely identifies the connection to this remote node. In + * order to talk to it you will need the returned value (or a copy of it). */ - void connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure, - string_view pubkey = {}, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT); + ConnectionID connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure, + string_view pubkey = {}, + AuthLevel auth_level = AuthLevel::none, + std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT); /** - * Disconnects an established outgoing connection established with `connect_remote()`. + * Disconnects an established outgoing connection established with `connect_remote()` (or, less + * commonly, `connect_sn()`). * - * @param id the connection id, as returned by `connect_remote()`. + * @param id the connection id, as returned by `connect_remote()` or the SN pubkey. * * @param linger how long to allow the connection to linger while there are still pending * outbound messages to it before disconnecting and dropping any pending messages. (Note that * this lingering is internal; the disconnect_remote() call does not block). The default is 1 * second. + * + * If given a pubkey, we try to close an outgoing connection to the given SN if one exists; note + * however that this is often not particularly useful as messages to that SN can immediately + * reopen the connection. */ - void disconnect_remote(string_view id, std::chrono::milliseconds linger = 1s); + void disconnect(ConnectionID id, std::chrono::milliseconds linger = 1s); /** - * Queue a message to be relayed to the node identified with the given identifier (for SNs and - * incoming connections this is a pubkey; for connections established with `connect()` this will - * be the opaque string returned by `connect()`), without expecting a reply. LokiMQ will - * attempt to relay the message (first connecting and handshaking if not already connected - * and the given pubkey is a service node's pubkey). + * Queue a message to be relayed to the given service node or remote without requiring a reply. + * LokiMQ will attempt to relay the message (first connecting and handshaking to the remote SN + * if not already connected). * * If a new connection is established it will have a relatively short (30s) idle timeout. If - * the connection should stay open longer you should call `connect(pubkey, IDLETIME)` first. + * the connection should stay open longer you should either call `connect(pubkey, IDLETIME)` or + * pass a a `send_option::keep_alive{IDLETIME}` in `opts`. * * Note that this method (along with connect) doesn't block waiting for a connection or for the * message to send; it merely instructs the proxy thread that it should send. ZMQ will * generally try hard to deliver it (reconnecting if the connection fails), but if the * connection fails persistently the message will eventually be dropped. * - * @param id - the pubkey or identifier returned by `connect()` to send this to + * @param remote - either a ConnectionID value returned by connect_remote, or a service node + * pubkey string. In the latter case, sending the message may trigger a new + * connection being established to the service node (i.e. you do not have to + * call connect() first). * @param cmd - the first data frame value which is almost always the remote "category.command" name - * @param opts - any number of std::string and send options. Each send option affects - * how the send works; each string becomes a serialized message part. + * @param opts - any number of std::string (or string_views) and send options. Each send option + * affects how the send works; each string becomes a message part. * * Example: * - * lmq.send(pubkey, "hello", "abc", send_option::hint("tcp://localhost:1234"), "def"); + * // Send to a SN, connecting to it if we aren't already connected: + * lmq.send(pubkey, "hello.world", "abc", send_option::hint("tcp://localhost:1234"), "def"); * - * sends the command `hello` to the given pubkey, containing additional message parts "abc" and - * "def", and, if not currently connected, using the given connection hint rather than - * performing a connection address lookup on the pubkey. + * // Start connecting to a remote and immediately queue a message for it + * auto conn = lmq.connect_remote("tcp://127.0.0.1:1234", + * [](ConnectionID) { std::cout << "connected\n"; }, + * [](ConnectionID, string_view why) { std::cout << "connection failed: " << why << \n"; }); + * lmq.send(conn, "hello.world", "abc", "def"); + * + * Both of these send the command `hello.world` to the given pubkey, containing additional + * message parts "abc" and "def". In the first case, if not currently connected, the given + * connection hint may be used rather than performing a connection address lookup on the pubkey. */ template - void send(string_view pubkey, string_view cmd, const T&... opts); + void send(ConnectionID to, string_view cmd, const T&... opts); - /** Send a command configured as a "REQUEST" command: the data parts will be prefixed with a - * random identifier. The remote is expected to reply with a ["REPLY", , ...] - * message, at which point we invoke the given callback with any [...] parts of the reply. + /** Send a command configured as a "REQUEST" command to a service node: the data parts will be + * prefixed with a random identifier. The remote is expected to reply with a ["REPLY", + * , ...] message, at which point we invoke the given callback with any [...] parts + * of the reply. * - * @param pubkey - the pubkey to send this request to + * Like `send()`, a new connection to the service node will be established if not already + * connected. + * + * @param to - the pubkey string or ConnectionID to send this request to * @param cmd - the command name * @param callback - the callback to invoke when we get a reply. Called with a true value and * the data strings when a reply is received, or false and an empty vector of data parts if we @@ -862,7 +1020,7 @@ public: * @param opts - anything else (i.e. strings, send_options) is forwarded to send(). */ template - void request(string_view pubkey, string_view cmd, ReplyCallback callback, const T&... opts); + void request(ConnectionID to, string_view cmd, ReplyCallback callback, const T&... opts); /// The key pair this LokiMQ was created with; if empty keys were given during construction then /// this returns the generated keys. @@ -879,7 +1037,7 @@ public: /** * Queues a single job to be executed with no return value. This is a shortcut for creating and - * submitting a single-job, no-completion batch. + * submitting a single-job, no-completion-function batch job. */ void job(std::function f); @@ -928,7 +1086,15 @@ struct incoming {}; /// for the send and its current idle timeout setting is less than this value then it is updated. struct keep_alive { std::chrono::milliseconds time; - keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {} + explicit keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {} +}; + +/// Specifies a routing prefix to be used. This option is required (and added automatically by +/// Message::send and ::reply) when a message is being sent to a non-SN connection on a listening +/// socket, and has no effect otherwise. +struct route { + std::string routing_prefix; + explicit route(std::string r) : routing_prefix{std::move(r)} {} }; } @@ -961,7 +1127,7 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option control_data["optional"] = 1; } -/// `incoming` specialization: sets the optional flag in the control data +/// `incoming` specialization: sets the incoming-only flag in the control data inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::incoming &) { control_data["incoming"] = 1; } @@ -971,19 +1137,28 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option control_data["keep-alive"] = timeout.time.count(); } +/// `route` specialization: adds a routing prefix to be used when sending a non-SN message on an +/// incoming socket. +inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::route& route) { + control_data["conn_route"] = route.routing_prefix; +} + } // namespace detail template -void LokiMQ::send(string_view pubkey, string_view cmd, const T &...opts) { +void LokiMQ::send(ConnectionID to, string_view cmd, const T &...opts) { bt_dict control_data; - bt_list parts{{cmd}}; + bt_list parts{{std::move(cmd)}}; #ifdef __cpp_fold_expressions (detail::apply_send_option(parts, control_data, opts),...); #else (void) std::initializer_list{(detail::apply_send_option(parts, control_data, opts), 0)...}; #endif - control_data["pubkey"] = pubkey; + if (to.sn()) + control_data["pubkey"] = std::move(to.pk); + else + control_data["conn_id"] = to.id; control_data["send"] = std::move(parts); detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data)); } @@ -991,17 +1166,21 @@ void LokiMQ::send(string_view pubkey, string_view cmd, const T &...opts) { std::string make_random_string(size_t size); template -void LokiMQ::request(string_view pubkey, string_view cmd, ReplyCallback callback, const T &...opts) { - auto reply_tag = make_random_string(15); // 15 should keep us in most stl implementations' small string optimization +void LokiMQ::request(ConnectionID to, string_view cmd, ReplyCallback callback, const T &...opts) { + auto reply_tag = make_random_string(15); // 15 random bytes is lots and should keep us in most stl implementations' small string optimization bt_dict control_data; - bt_list parts{{cmd, reply_tag}}; + bt_list parts{{std::move(cmd), reply_tag}}; #ifdef __cpp_fold_expressions (detail::apply_send_option(parts, control_data, opts),...); #else (void) std::initializer_list{(detail::apply_send_option(parts, control_data, opts), 0)...}; #endif - control_data["pubkey"] = pubkey; + if (to.sn()) + control_data["pubkey"] = std::move(to.pk); + else + control_data["conn_id"] = to.id; + control_data["send"] = std::move(parts); control_data["request"] = true; control_data["request_callback"] = reinterpret_cast(new ReplyCallback{std::move(callback)}); @@ -1011,20 +1190,23 @@ void LokiMQ::request(string_view pubkey, string_view cmd, ReplyCallback callback template void Message::send_back(string_view command, Args&&... args) { - assert(reply_tag.empty()); - if (service_node) lokimq.send(pubkey, command, std::forward(args)...); - else lokimq.send(pubkey, command, send_option::optional{}, std::forward(args)...); + if (conn.sn()) lokimq.send(conn, command, std::forward(args)...); + else lokimq.send(conn, command, send_option::route{route}, send_option::optional{}, std::forward(args)...); } template void Message::send_reply(Args&&... args) { assert(!reply_tag.empty()); - if (service_node) lokimq.send(pubkey, "REPLY", reply_tag, std::forward(args)...); - else lokimq.send(pubkey, "REPLY", reply_tag, send_option::optional{}, std::forward(args)...); + if (conn.sn()) lokimq.send(conn, "REPLY", reply_tag, std::forward(args)...); + else lokimq.send(conn, "REPLY", reply_tag, send_option::route{route}, send_option::optional{}, std::forward(args)...); } - - +template +void Message::send_request(string_view cmd, ReplyCallback&& callback, Args&&... args) { + if (conn.sn()) lokimq.request(conn, cmd, std::forward(callback), std::forward(args)...); + else lokimq.request(conn, cmd, std::forward(callback), + send_option::route{route}, send_option::optional{}, std::forward(args)...); +} template void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) { @@ -1042,6 +1224,6 @@ void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) { std::ostream &operator<<(std::ostream &os, LogLevel lvl); -} +} // namespace lokimq // vim:sw=4:et diff --git a/tests/test_batch.cpp b/tests/test_batch.cpp index fbb3c07..088c08a 100644 --- a/tests/test_batch.cpp +++ b/tests/test_batch.cpp @@ -44,9 +44,7 @@ TEST_CASE("batching many small jobs", "[batch-many]") { lokimq::LokiMQ lmq{ "", "", // generate ephemeral keys false, // not a service node - {}, // don't listen [](auto) { return ""; }, - [](auto ip, auto pk) { return lokimq::Allow{lokimq::AuthLevel::none, false}; }, }; lmq.set_general_threads(4); lmq.set_batch_threads(4); @@ -62,9 +60,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") { lokimq::LokiMQ lmq{ "", "", // generate ephemeral keys false, // not a service node - {}, // don't listen [](auto) { return ""; }, - [](auto ip, auto pk) { return lokimq::Allow{lokimq::AuthLevel::none, false}; }, }; lmq.set_general_threads(4); lmq.set_batch_threads(4); diff --git a/tests/test_commands.cpp b/tests/test_commands.cpp index f65acd7..aa6751a 100644 --- a/tests/test_commands.cpp +++ b/tests/test_commands.cpp @@ -1,5 +1,6 @@ #include "common.h" #include +#include using namespace lokimq; @@ -8,12 +9,11 @@ TEST_CASE("basic commands", "[commands]") { LokiMQ server{ "", "", // generate ephemeral keys false, // not a service node - {listen}, - [](auto &) { return ""; }, - [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }, + [](auto) { return ""; }, get_logger("S» ") }; server.log_level(LogLevel::trace); + server.listen_curve(listen, [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }); std::atomic hellos{0}, his{0}; @@ -25,7 +25,7 @@ TEST_CASE("basic commands", "[commands]") { }); std::string client_pubkey; server.add_command("public", "client.pubkey", [&](Message& m) { - client_pubkey = std::string{m.pubkey}; + client_pubkey = std::string{m.conn.pubkey()}; }); server.start(); @@ -42,9 +42,9 @@ TEST_CASE("basic commands", "[commands]") { std::atomic connected{false}, failed{false}; std::string pubkey; - client.connect_remote(listen, - [&](std::string pk) { pubkey = std::move(pk); connected = true; }, - [&](string_view) { failed = true; }, + auto c = client.connect_remote(listen, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto conn, string_view) { failed = true; }, server.get_pubkey()); int i; @@ -56,18 +56,18 @@ TEST_CASE("basic commands", "[commands]") { REQUIRE( connected.load() ); REQUIRE( i <= 1 ); // should be fast REQUIRE( !failed.load() ); - REQUIRE( pubkey == server.get_pubkey() ); + REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) ); - client.send(pubkey, "public.hello"); - client.send(pubkey, "public.client.pubkey"); + client.send(c, "public.hello"); + client.send(c, "public.client.pubkey"); std::this_thread::sleep_for(50ms); REQUIRE( hellos == 1 ); REQUIRE( his == 1 ); - REQUIRE( client_pubkey == client.get_pubkey() ); + REQUIRE( to_hex(client_pubkey) == to_hex(client.get_pubkey()) ); for (int i = 0; i < 50; i++) - client.send(pubkey, "public.hello"); + client.send(c, "public.hello"); std::this_thread::sleep_for(100ms); REQUIRE( hellos == 51 ); diff --git a/tests/test_connect.cpp b/tests/test_connect.cpp index 1dbefd4..4f9e2aa 100644 --- a/tests/test_connect.cpp +++ b/tests/test_connect.cpp @@ -5,18 +5,17 @@ extern "C" { } -TEST_CASE("connections", "[connect][curve]") { +TEST_CASE("connections with curve authentication", "[curve][connect]") { std::string listen = "tcp://127.0.0.1:4455"; LokiMQ server{ "", "", // generate ephemeral keys false, // not a service node - {listen}, [](auto) { return ""; }, - [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }, get_logger("S» ") }; server.log_level(LogLevel::trace); + server.listen_curve(listen, [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }); server.add_category("public", Access{AuthLevel::none}); server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); server.start(); @@ -28,9 +27,9 @@ TEST_CASE("connections", "[connect][curve]") { auto pubkey = server.get_pubkey(); std::atomic connected{0}; - client.connect_remote(listen, - [&](std::string pk) { connected = 1; }, - [&](string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }, + auto server_conn = client.connect_remote(listen, + [&](auto conn) { connected = 1; }, + [&](auto conn, string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }, pubkey); int i; @@ -44,7 +43,7 @@ TEST_CASE("connections", "[connect][curve]") { bool success = false; std::vector parts; - client.request(pubkey, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; }); + client.request(server_conn, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; }); std::this_thread::sleep_for(50ms); REQUIRE( success ); @@ -58,19 +57,18 @@ TEST_CASE("self-connection SN optimization", "[connect][self]") { LokiMQ sn{ pubkey, privkey, true, - {"tcp://127.0.0.1:5544"}, [&](auto pk) { if (pk == pubkey) return "tcp://127.0.0.1:5544"; else return ""; }, - [&](auto ip, auto pk) { REQUIRE(ip == "127.0.0.1"); return Allow{AuthLevel::none, pk == pubkey}; }, get_logger("S» ") }; + sn.listen_curve("tcp://127.0.0.1:5544", [&](auto ip, auto pk) { REQUIRE(ip == "127.0.0.1"); return Allow{AuthLevel::none, pk == pubkey}; }); sn.add_category("a", Access{AuthLevel::none}); bool invoked = false; sn.add_command("a", "b", [&](const Message& m) { invoked = true; auto lock = catch_lock(); - REQUIRE(m.pubkey == pubkey); - REQUIRE(m.service_node); + REQUIRE(m.conn.sn()); + REQUIRE(m.conn.pubkey() == pubkey); REQUIRE(!m.data.empty()); REQUIRE(m.data[0] == "my data"); }); @@ -82,3 +80,44 @@ TEST_CASE("self-connection SN optimization", "[connect][self]") { std::this_thread::sleep_for(50ms); REQUIRE(invoked); } + +TEST_CASE("plain-text connections", "[plaintext][connect]") { + std::string listen = "tcp://127.0.0.1:4455"; + LokiMQ server{get_logger("S» ")}; + server.log_level(LogLevel::trace); + + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); }); + + server.listen_plain(listen, [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }); + + server.start(); + + LokiMQ client{get_logger("C» ")}; + client.log_level(LogLevel::trace); + + client.start(); + + std::atomic connected{0}; + auto c = client.connect_remote(listen, + [&](auto conn) { connected = 1; }, + [&](auto conn, string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); } + ); + + int i; + for (i = 0; i < 5; i++) { + if (connected.load()) + break; + std::this_thread::sleep_for(50ms); + } + REQUIRE( i <= 1 ); + REQUIRE( connected.load() ); + + bool success = false; + std::vector parts; + client.request(c, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; }); + std::this_thread::sleep_for(50ms); + REQUIRE( success ); + + +} diff --git a/tests/test_requests.cpp b/tests/test_requests.cpp index 23e5ce6..33b14b5 100644 --- a/tests/test_requests.cpp +++ b/tests/test_requests.cpp @@ -1,5 +1,6 @@ #include "common.h" #include +#include using namespace lokimq; @@ -8,10 +9,9 @@ TEST_CASE("basic requests", "[requests]") { LokiMQ server{ "", "", // generate ephemeral keys false, // not a service node - {listen}, - [](auto &) { return ""; }, - [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }, + [](auto) { return ""; }, }; + server.listen_curve(listen); std::atomic hellos{0}, his{0}; @@ -31,9 +31,9 @@ TEST_CASE("basic requests", "[requests]") { std::atomic connected{false}, failed{false}; std::string pubkey; - client.connect_remote(listen, - [&](std::string pk) { pubkey = std::move(pk); connected = true; }, - [&](string_view) { failed = true; }, + auto c = client.connect_remote(listen, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }, server.get_pubkey()); int i; @@ -45,12 +45,70 @@ TEST_CASE("basic requests", "[requests]") { REQUIRE( connected.load() ); REQUIRE( !failed.load() ); REQUIRE( i <= 1 ); - REQUIRE( pubkey == server.get_pubkey() ); + REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) ); std::atomic got_reply{false}; bool success; std::vector data; - client.request(pubkey, "public.hello", [&](bool ok, std::vector data_) { + client.request(c, "public.hello", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + std::this_thread::sleep_for(50ms); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"123"}} ); +} + +TEST_CASE("request from server to client", "[requests]") { + std::string listen = "tcp://127.0.0.1:5678"; + LokiMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + [](auto) { return ""; }, + }; + server.listen_curve(listen); + + std::atomic hellos{0}, his{0}; + + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "hello", [&](Message& m) { + m.send_reply("123"); + }); + server.start(); + + LokiMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + //client.log_level(LogLevel::trace); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + auto c = client.connect_remote(listen, + [&](auto conn) { pubkey = conn.pubkey(); connected = true; }, + [&](auto, auto) { failed = true; }, + server.get_pubkey()); + + int i; + for (i = 0; i < 5; i++) { + if (connected.load()) + break; + std::this_thread::sleep_for(50ms); + } + REQUIRE( connected.load() ); + REQUIRE( !failed.load() ); + REQUIRE( i <= 1 ); + REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) ); + + std::atomic got_reply{false}; + bool success; + std::vector data; + client.request(c, "public.hello", [&](bool ok, std::vector data_) { got_reply = true; success = ok; data = std::move(data_);