mirror of https://github.com/oxen-io/oxen-mq.git
Catch errors on internal msgs; support non-blocking sends
When we try to route an internal message ("BYE", "NOT_A_SERVICE_NODE", etc.) back to the remote from the proxy thread we can end up trying to send to a disconnected remote, which raises an exception, but this isn't caught in proxy code: fix this by catching and ignoring it. This also changes the code to send these messages in "dontwait" mode so that if we can't queue the message we get (and ignore) an exception rather than blocking.
This commit is contained in:
parent
7f9141a4a9
commit
bcca8dd34e
|
@ -48,20 +48,24 @@ bool LokiMQ::proxy_check_auth(size_t conn_index, bool outgoing, const peer_info&
|
||||||
// Disconnect: we don't think the remote is a SN, but it issued a command only SNs should be
|
// Disconnect: we don't think the remote is a SN, but it issued a command only SNs should be
|
||||||
// issuing. Drop the connection; if the remote has something important to relay it will
|
// issuing. Drop the connection; if the remote has something important to relay it will
|
||||||
// reconnect, at which point we will reassess the SN status on the new incoming connection.
|
// reconnect, at which point we will reassess the SN status on the new incoming connection.
|
||||||
if (outgoing)
|
if (outgoing) {
|
||||||
proxy_disconnect(peer.service_node ? ConnectionID{peer.pubkey} : conn_index_to_id[conn_index], 1s);
|
proxy_disconnect(peer.service_node ? ConnectionID{peer.pubkey} : conn_index_to_id[conn_index], 1s);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
send_routed_message(connections[conn_index], peer.route, "BYE");
|
reply = "BYE";
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reply.empty())
|
if (reply.empty())
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
if (outgoing)
|
try {
|
||||||
send_direct_message(connections[conn_index], std::move(reply), command);
|
if (outgoing)
|
||||||
else
|
send_direct_message(connections[conn_index], std::move(reply), command, zmq::send_flags::dontwait);
|
||||||
send_routed_message(connections[conn_index], peer.route, std::move(reply), command);
|
else
|
||||||
|
send_routed_message(connections[conn_index], peer.route, std::move(reply), command, zmq::send_flags::dontwait);
|
||||||
|
} catch (const zmq::error_t&) { /* can't send: possibly already disconnected. Ignore. */ }
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,40 +38,41 @@ inline zmq::message_t create_message(string_view data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename It>
|
template <typename It>
|
||||||
void send_message_parts(zmq::socket_t &sock, It begin, It end) {
|
void send_message_parts(zmq::socket_t &sock, It begin, It end, const zmq::send_flags flags = zmq::send_flags::none) {
|
||||||
while (begin != end) {
|
while (begin != end) {
|
||||||
// FIXME: for outgoing connections on ZMQ_DEALER we want to use ZMQ_DONTWAIT and handle
|
|
||||||
// EAGAIN error (which either means the peer HWM is hit -- probably indicating a connection
|
|
||||||
// failure -- or the underlying connect() system call failed). Assuming it's an outgoing
|
|
||||||
// connection, we should destroy it.
|
|
||||||
zmq::message_t &msg = *begin++;
|
zmq::message_t &msg = *begin++;
|
||||||
sock.send(msg, begin == end ? zmq::send_flags::none : zmq::send_flags::sndmore);
|
sock.send(msg, begin == end ? flags : flags | zmq::send_flags::sndmore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Container>
|
template <typename Container>
|
||||||
void send_message_parts(zmq::socket_t &sock, Container &&c) {
|
void send_message_parts(zmq::socket_t &sock, Container &&c, zmq::send_flags flags = zmq::send_flags::none) {
|
||||||
send_message_parts(sock, c.begin(), c.end());
|
send_message_parts(sock, c.begin(), c.end(), flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then
|
/// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then
|
||||||
/// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted.
|
/// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted.
|
||||||
inline void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {}) {
|
/// `flags` is passed through to zmq: typically given `zmq::send_flags::dontwait` to throw rather
|
||||||
|
/// than block if a message can't be queued.
|
||||||
|
inline void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {},
|
||||||
|
zmq::send_flags flags = zmq::send_flags::none) {
|
||||||
assert(!route.empty());
|
assert(!route.empty());
|
||||||
std::array<zmq::message_t, 3> msgs{{create_message(std::move(route))}};
|
std::array<zmq::message_t, 3> msgs{{create_message(std::move(route))}};
|
||||||
if (!msg.empty())
|
if (!msg.empty())
|
||||||
msgs[1] = create_message(std::move(msg));
|
msgs[1] = create_message(std::move(msg));
|
||||||
if (!data.empty())
|
if (!data.empty())
|
||||||
msgs[2] = create_message(std::move(data));
|
msgs[2] = create_message(std::move(data));
|
||||||
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end());
|
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end(), flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends some stuff to a socket directly.
|
// Sends some stuff to a socket directly. If dontwait is true then we throw instead of blocking if
|
||||||
inline void send_direct_message(zmq::socket_t &socket, std::string msg, std::string data = {}) {
|
// the message cannot be accepted by zmq (i.e. because the outgoing buffer is full).
|
||||||
|
inline void send_direct_message(zmq::socket_t &socket, std::string msg, std::string data = {},
|
||||||
|
zmq::send_flags flags = zmq::send_flags::none) {
|
||||||
std::array<zmq::message_t, 2> msgs{{create_message(std::move(msg))}};
|
std::array<zmq::message_t, 2> msgs{{create_message(std::move(msg))}};
|
||||||
if (!data.empty())
|
if (!data.empty())
|
||||||
msgs[1] = create_message(std::move(data));
|
msgs[1] = create_message(std::move(data));
|
||||||
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end());
|
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end(), flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receive all the parts of a single message from the given socket. Returns true if a message was
|
// Receive all the parts of a single message from the given socket. Returns true if a message was
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#include "lokimq.h"
|
#include "lokimq.h"
|
||||||
#include "batch.h"
|
#include "batch.h"
|
||||||
|
#include "hex.h"
|
||||||
#include "lokimq-internal.h"
|
#include "lokimq-internal.h"
|
||||||
|
|
||||||
namespace lokimq {
|
namespace lokimq {
|
||||||
|
@ -230,10 +231,13 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
|
||||||
auto cat_call = get_command(command);
|
auto cat_call = get_command(command);
|
||||||
|
|
||||||
if (!cat_call.first) {
|
if (!cat_call.first) {
|
||||||
if (outgoing)
|
LMQ_LOG(warn, "Invalid command '", command, "' sent by remote [", to_hex(peer->pubkey), "]/", peer_address(parts.back()));
|
||||||
send_direct_message(connections[conn_index], "UNKNOWNCOMMAND", command);
|
try {
|
||||||
else
|
if (outgoing)
|
||||||
send_routed_message(connections[conn_index], peer->route, "UNKNOWNCOMMAND", command);
|
send_direct_message(connections[conn_index], "UNKNOWNCOMMAND", command, zmq::send_flags::dontwait);
|
||||||
|
else
|
||||||
|
send_routed_message(connections[conn_index], peer->route, "UNKNOWNCOMMAND", command, zmq::send_flags::dontwait);
|
||||||
|
} catch (const zmq::error_t&) { /* can't send: possibly already disconnected. Ignore. */ }
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue