From bcca8dd34e74c8ed1339fbc0dfd0aaaac4e583d7 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Sun, 29 Mar 2020 11:34:55 -0300 Subject: [PATCH] Catch errors on internal msgs; support non-blocking sends When we try to route an internal message ("BYE", "NOT_A_SERVICE_NODE", etc.) back to the remote from the proxy thread we can end up trying to send to a disconnected remote, which raises an exception, but this isn't caught in proxy code: fix this by catching and ignoring it. This also changes the code to send these messages in "dontwait" mode so that if we can't queue the message we get (and ignore) an exception rather than blocking. --- lokimq/auth.cpp | 18 +++++++++++------- lokimq/lokimq-internal.h | 27 ++++++++++++++------------- lokimq/worker.cpp | 12 ++++++++---- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/lokimq/auth.cpp b/lokimq/auth.cpp index 5e0ed8c..a7c58b3 100644 --- a/lokimq/auth.cpp +++ b/lokimq/auth.cpp @@ -48,20 +48,24 @@ bool LokiMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info& // Disconnect: we don't think the remote is a SN, but it issued a command only SNs should be // issuing. Drop the connection; if the remote has something important to relay it will // reconnect, at which point we will reassess the SN status on the new incoming connection. - if (outgoing) + if (outgoing) { proxy_disconnect(peer.service_node ? ConnectionID{peer.pubkey} : conn_index_to_id[conn_index], 1s); + return false; + } else - send_routed_message(connections[conn_index], peer.route, "BYE"); - return false; + reply = "BYE"; } if (reply.empty()) return true; - if (outgoing) - send_direct_message(connections[conn_index], std::move(reply), command); - else - send_routed_message(connections[conn_index], peer.route, std::move(reply), command); + try { + if (outgoing) + send_direct_message(connections[conn_index], std::move(reply), command, zmq::send_flags::dontwait); + else + send_routed_message(connections[conn_index], peer.route, std::move(reply), command, zmq::send_flags::dontwait); + } catch (const zmq::error_t&) { /* can't send: possibly already disconnected. Ignore. */ } + return false; } diff --git a/lokimq/lokimq-internal.h b/lokimq/lokimq-internal.h index b15b468..b5c190c 100644 --- a/lokimq/lokimq-internal.h +++ b/lokimq/lokimq-internal.h @@ -38,40 +38,41 @@ inline zmq::message_t create_message(string_view data) { } template -void send_message_parts(zmq::socket_t &sock, It begin, It end) { +void send_message_parts(zmq::socket_t &sock, It begin, It end, const zmq::send_flags flags = zmq::send_flags::none) { while (begin != end) { - // FIXME: for outgoing connections on ZMQ_DEALER we want to use ZMQ_DONTWAIT and handle - // EAGAIN error (which either means the peer HWM is hit -- probably indicating a connection - // failure -- or the underlying connect() system call failed). Assuming it's an outgoing - // connection, we should destroy it. zmq::message_t &msg = *begin++; - sock.send(msg, begin == end ? zmq::send_flags::none : zmq::send_flags::sndmore); + sock.send(msg, begin == end ? flags : flags | zmq::send_flags::sndmore); } } template -void send_message_parts(zmq::socket_t &sock, Container &&c) { - send_message_parts(sock, c.begin(), c.end()); +void send_message_parts(zmq::socket_t &sock, Container &&c, zmq::send_flags flags = zmq::send_flags::none) { + send_message_parts(sock, c.begin(), c.end(), flags); } /// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then /// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted. -inline void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {}) { +/// `flags` is passed through to zmq: typically given `zmq::send_flags::dontwait` to throw rather +/// than block if a message can't be queued. +inline void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {}, + zmq::send_flags flags = zmq::send_flags::none) { assert(!route.empty()); std::array msgs{{create_message(std::move(route))}}; if (!msg.empty()) msgs[1] = create_message(std::move(msg)); if (!data.empty()) msgs[2] = create_message(std::move(data)); - send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end()); + send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end(), flags); } -// Sends some stuff to a socket directly. -inline void send_direct_message(zmq::socket_t &socket, std::string msg, std::string data = {}) { +// Sends some stuff to a socket directly. If dontwait is true then we throw instead of blocking if +// the message cannot be accepted by zmq (i.e. because the outgoing buffer is full). +inline void send_direct_message(zmq::socket_t &socket, std::string msg, std::string data = {}, + zmq::send_flags flags = zmq::send_flags::none) { std::array msgs{{create_message(std::move(msg))}}; if (!data.empty()) msgs[1] = create_message(std::move(data)); - send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end()); + send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end(), flags); } // Receive all the parts of a single message from the given socket. Returns true if a message was diff --git a/lokimq/worker.cpp b/lokimq/worker.cpp index 66f6229..916ca77 100644 --- a/lokimq/worker.cpp +++ b/lokimq/worker.cpp @@ -1,5 +1,6 @@ #include "lokimq.h" #include "batch.h" +#include "hex.h" #include "lokimq-internal.h" namespace lokimq { @@ -230,10 +231,13 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par auto cat_call = get_command(command); if (!cat_call.first) { - if (outgoing) - send_direct_message(connections[conn_index], "UNKNOWNCOMMAND", command); - else - send_routed_message(connections[conn_index], peer->route, "UNKNOWNCOMMAND", command); + LMQ_LOG(warn, "Invalid command '", command, "' sent by remote [", to_hex(peer->pubkey), "]/", peer_address(parts.back())); + try { + if (outgoing) + send_direct_message(connections[conn_index], "UNKNOWNCOMMAND", command, zmq::send_flags::dontwait); + else + send_routed_message(connections[conn_index], peer->route, "UNKNOWNCOMMAND", command, zmq::send_flags::dontwait); + } catch (const zmq::error_t&) { /* can't send: possibly already disconnected. Ignore. */ } return; }