oxen-mq/oxenmq/proxy.cpp
Jason Rhinelander caadd35052
epoll: fix hang on heavily loaded sockets
This fixes a hang in the epoll code that triggers on heavy, bursty
connections (such as the live SPNS APNs notifier).

It turns out that side-effects of processing our sockets could leave
other sockets (that we processed earlier in the loop) in a
needs-attention state which we might not notice if we go back to
epoll_wait right away.  zmq::poll apparently takes care of this (and so
is safe to re-poll even in this state), but when we are using epoll we
need to worry about it by always checking for zmq events (which itself
has side effects) and, if we get any, re-enter the loop body immediately
*without* polling to deal with them.
2023-09-15 18:29:23 -03:00

844 lines
35 KiB
C++

#include "oxenmq.h"
#include "oxenmq-internal.h"
#include <oxenc/hex.h>
#include <exception>
#include <future>
#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
extern "C" {
#include <pthread.h>
#include <pthread_np.h>
}
#endif
#ifdef OXENMQ_USE_EPOLL
#include <sys/epoll.h>
#endif
#ifndef _WIN32
extern "C" {
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
}
#endif
namespace oxenmq {
void OxenMQ::proxy_quit() {
OMQ_LOG(debug, "Received quit command, shutting down proxy thread");
assert(std::none_of(workers.begin(), workers.end(), [](auto& worker) { return worker.worker_thread.joinable(); }));
assert(std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto& worker) { return std::get<0>(worker).worker_thread.joinable(); }));
command.set(zmq::sockopt::linger, 0);
command.close();
{
std::lock_guard lock{control_sockets_mutex};
proxy_shutting_down = true; // To prevent threads from opening new control sockets
}
workers_socket.close();
int linger = std::chrono::milliseconds{CLOSE_LINGER}.count();
for (auto& [id, s] : connections)
s.set(zmq::sockopt::linger, linger);
connections.clear();
peers.clear();
OMQ_LOG(debug, "Proxy thread teardown complete");
}
void OxenMQ::proxy_send(oxenc::bt_dict_consumer data) {
// NB: bt_dict_consumer goes in alphabetical order
std::string_view hint;
std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE};
std::chrono::milliseconds request_timeout{DEFAULT_REQUEST_TIMEOUT};
bool optional = false;
bool outgoing = false;
bool incoming = false;
bool request = false;
bool have_conn_id = false;
ConnectionID conn_id;
std::string request_tag;
ReplyCallback request_callback;
if (data.skip_until("conn_id")) {
conn_id.id = data.consume_integer<long long>();
if (conn_id.id == -1)
throw std::runtime_error("Invalid error: invalid conn_id value (-1)");
have_conn_id = true;
}
if (data.skip_until("conn_pubkey")) {
if (have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and conn_pubkey are exclusive");
conn_id.pk = data.consume_string();
conn_id.id = ConnectionID::SN_ID;
} else if (!have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy send command; conn_pubkey or conn_id missing");
if (data.skip_until("conn_route"))
conn_id.route = data.consume_string();
if (data.skip_until("hint"))
hint = data.consume_string_view();
if (data.skip_until("incoming"))
incoming = data.consume_integer<bool>();
if (data.skip_until("keep_alive"))
keep_alive = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (data.skip_until("optional"))
optional = data.consume_integer<bool>();
if (data.skip_until("outgoing"))
outgoing = data.consume_integer<bool>();
if (data.skip_until("request"))
request = data.consume_integer<bool>();
if (request) {
if (!data.skip_until("request_callback"))
throw std::runtime_error("Internal error: received request without request_callback");
request_callback = detail::deserialize_object<ReplyCallback>(data.consume_integer<uintptr_t>());
if (!data.skip_until("request_tag"))
throw std::runtime_error("Internal error: received request without request_name");
request_tag = data.consume_string();
if (data.skip_until("request_timeout"))
request_timeout = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
}
if (!data.skip_until("send"))
throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing");
oxenc::bt_list_consumer send = data.consume_list_consumer();
send_option::queue_failure::callback_t callback_nosend;
if (data.skip_until("send_fail"))
callback_nosend = detail::deserialize_object<decltype(callback_nosend)>(data.consume_integer<uintptr_t>());
send_option::queue_full::callback_t callback_noqueue;
if (data.skip_until("send_full_q"))
callback_noqueue = detail::deserialize_object<decltype(callback_noqueue)>(data.consume_integer<uintptr_t>());
// Now figure out which socket to send to and do the actual sending. We can repeat this loop
// multiple times, if we're sending to a SN, because it's possible that we have multiple
// connections open to that SN (e.g. one out + one in) so if one fails we can clean up that
// connection and try the next one.
bool retry = true, sent = false, nowarn = false;
while (retry) {
retry = false;
zmq::socket_t *send_to;
if (conn_id.sn()) {
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, EPHEMERAL_ROUTING_ID, keep_alive);
if (!sock_route.first) {
nowarn = true;
if (optional)
OMQ_LOG(debug, "Not sending: send is optional and no connection to ",
oxenc::to_hex(conn_id.pk), " is currently established");
else
OMQ_LOG(error, "Unable to send to ", oxenc::to_hex(conn_id.pk), ": no valid connection address found");
break;
}
send_to = sock_route.first;
conn_id.route = std::move(sock_route.second);
} else if (!conn_id.route.empty()) { // incoming non-SN connection
auto it = connections.find(conn_id.id);
if (it == connections.end()) {
OMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found");
break;
}
send_to = &it->second;
} else {
auto pr = peers.equal_range(conn_id);
if (pr.first == peers.end()) {
OMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
break;
}
auto& peer = pr.first->second;
auto it = connections.find(peer.conn_id);
if (it == connections.end()) {
OMQ_LOG(warn, "Unable to send: peer connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
break;
}
send_to = &it->second;
}
try {
sent = send_message_parts(*send_to, build_send_parts(send, conn_id.route));
} catch (const zmq::error_t &e) {
if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) {
OMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details");
auto pr = peers.equal_range(conn_id);
if (pr.first != peers.end()) {
if (!conn_id.sn()) {
peers.erase(pr.first);
} else {
bool removed;
for (auto it = pr.first; it != pr.second; ) {
auto& peer = it->second;
if (peer.route == conn_id.route) {
peers.erase(it);
removed = true;
break;
}
}
// The incoming connection to the SN is no longer good, but we can retry because
// we may have another active connection with the SN (or may want to open one).
if (removed) {
OMQ_LOG(debug, "Retrying sending to SN ", oxenc::to_hex(conn_id.pk), " using other sockets");
retry = true;
}
}
}
}
if (!retry) {
if (!conn_id.sn() && !conn_id.route.empty()) { // incoming non-SN connection
OMQ_LOG(debug, "Unable to send message to incoming connection ", conn_id, ": ", e.what(),
"; remote has probably disconnected");
} else {
OMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
}
nowarn = true;
if (callback_nosend) {
job([callback = std::move(callback_nosend), error = e] { callback(&error); });
callback_nosend = nullptr;
}
}
}
}
if (request) {
if (sent) {
OMQ_LOG(debug, "Added new pending request ", oxenc::to_hex(request_tag));
pending_requests.insert({ request_tag, {
std::chrono::steady_clock::now() + request_timeout, std::move(request_callback) }});
} else {
OMQ_LOG(debug, "Could not send request, scheduling request callback failure");
job([callback = std::move(request_callback)] { callback(false, {{"TIMEOUT"s}}); });
}
}
if (!sent) {
if (callback_nosend)
job([callback = std::move(callback_nosend)] { callback(nullptr); });
else if (callback_noqueue)
job(std::move(callback_noqueue));
else if (!nowarn)
OMQ_LOG(warn, "Unable to send message to ", conn_id, ": sending would block");
}
}
void OxenMQ::proxy_reply(oxenc::bt_dict_consumer data) {
bool have_conn_id = false;
ConnectionID conn_id{0};
if (data.skip_until("conn_id")) {
conn_id.id = data.consume_integer<long long>();
if (conn_id.id == -1)
throw std::runtime_error("Invalid error: invalid conn_id value (-1)");
have_conn_id = true;
}
if (data.skip_until("conn_pubkey")) {
if (have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy reply command; conn_id and conn_pubkey are exclusive");
conn_id.pk = data.consume_string();
conn_id.id = ConnectionID::SN_ID;
} else if (!have_conn_id)
throw std::runtime_error("Internal error: Invalid proxy reply command; conn_pubkey or conn_id missing");
if (!data.skip_until("send"))
throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing");
oxenc::bt_list_consumer send = data.consume_list_consumer();
auto pr = peers.equal_range(conn_id);
if (pr.first == pr.second) {
OMQ_LOG(warn, "Unable to send tagged reply: the connection is no longer valid");
return;
}
// We try any connections until one works (for ordinary remotes there will be just one, but for
// SNs there might be one incoming and one outgoing).
for (auto it = pr.first; it != pr.second; ) {
try {
send_message_parts(connections[it->second.conn_id], build_send_parts(send, it->second.route));
break;
} catch (const zmq::error_t &err) {
if (err.num() == EHOSTUNREACH) {
if (it->second.outgoing()) {
OMQ_LOG(debug, "Unable to send reply to non-SN request on outgoing socket: "
"remote is no longer connected; closing connection");
proxy_close_connection(it->second.conn_id, CLOSE_LINGER);
it = peers.erase(it);
++it;
} else {
OMQ_LOG(debug, "Unable to send reply to non-SN request on incoming socket: "
"remote is no longer connected; removing peer details");
it = peers.erase(it);
}
} else {
OMQ_LOG(warn, "Unable to send reply to incoming non-SN request: ", err.what());
++it;
}
}
}
}
void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t len) {
// We throw an uncaught exception here because we only generate control messages internally in
// oxenmq code: if one of these condition fail it's a oxenmq bug.
if (len < 2)
throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message");
auto route = view(parts[0]), cmd = view(parts[1]);
OMQ_TRACE("control message: ", cmd);
if (len == 3) {
OMQ_TRACE("...: ", parts[2]);
auto data = view(parts[2]);
if (cmd == "SEND") {
OMQ_TRACE("proxying message");
return proxy_send(data);
} else if (cmd == "REPLY") {
OMQ_TRACE("proxying reply to non-SN incoming message");
return proxy_reply(data);
} else if (cmd == "BATCH") {
OMQ_TRACE("proxy batch jobs");
auto ptrval = oxenc::bt_deserialize<uintptr_t>(data);
return proxy_batch(reinterpret_cast<detail::Batch*>(ptrval));
} else if (cmd == "INJECT") {
OMQ_TRACE("proxy inject");
return proxy_inject_task(detail::deserialize_object<injected_task>(oxenc::bt_deserialize<uintptr_t>(data)));
} else if (cmd == "SET_SNS") {
return proxy_set_active_sns(data);
} else if (cmd == "UPDATE_SNS") {
return proxy_update_active_sns(data);
} else if (cmd == "CONNECT_SN") {
proxy_connect_sn(data);
return;
} else if (cmd == "CONNECT_REMOTE") {
return proxy_connect_remote(data);
} else if (cmd == "DISCONNECT") {
return proxy_disconnect(data);
} else if (cmd == "TIMER") {
return proxy_timer(data);
} else if (cmd == "TIMER_DEL") {
return proxy_timer_del(oxenc::bt_deserialize<int>(data));
} else if (cmd == "BIND") {
auto b = detail::deserialize_object<bind_data>(oxenc::bt_deserialize<uintptr_t>(data));
if (proxy_bind(b, bind.size()))
bind.push_back(std::move(b));
return;
}
} else if (len == 2) {
if (cmd == "START") {
// Command send by the owning thread during startup; we send back a simple READY reply to
// let it know we are running.
return route_control(command, route, "READY");
} else if (cmd == "QUIT") {
// Asked to quit: set max_workers to zero and tell any idle ones to quit. We will
// close workers as they come back to READY status, and then close external
// connections once all workers are done.
max_workers = 0;
for (size_t i = 0; i < idle_worker_count; i++)
route_control(workers_socket, workers[idle_workers[i]].worker_routing_id, "QUIT");
idle_worker_count = 0;
for (auto& [run, busy, queue] : tagged_workers)
if (!busy)
route_control(workers_socket, run.worker_routing_id, "QUIT");
return;
}
}
throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " +
std::string{cmd} + " (" + std::to_string(len) + ")");
}
bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
zmq::socket_t listener{context, zmq::socket_type::router};
setup_incoming_socket(listener, b.curve, pubkey, privkey, bind_index);
bool good = true;
try {
listener.bind(b.address);
} catch (const zmq::error_t&) {
good = false;
}
if (b.on_bind) {
b.on_bind(good);
b.on_bind = nullptr;
}
if (!good) {
OMQ_LOG(warn, "OxenMQ failed to listen on ", b.address);
return false;
}
OMQ_LOG(info, "OxenMQ listening on ", b.address);
b.conn_id = next_conn_id++;
connections.emplace_hint(connections.end(), b.conn_id, std::move(listener));
connections_updated = true;
return true;
}
void OxenMQ::proxy_loop_init() {
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
pthread_setname_np(pthread_self(), "omq-proxy");
#elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
pthread_set_name_np(pthread_self(), "omq-proxy");
#elif defined(__MACH__)
pthread_setname_np("omq-proxy");
#endif
zap_auth = zmq::socket_t{context, zmq::socket_type::rep};
zap_auth.set(zmq::sockopt::linger, 0);
zap_auth.bind(ZMQ_ADDR_ZAP);
workers_socket = zmq::socket_t{context, zmq::socket_type::router};
workers_socket.set(zmq::sockopt::router_mandatory, true);
workers_socket.bind(SN_ADDR_WORKERS);
workers.reserve(max_workers);
idle_workers.resize(max_workers);
if (!workers.empty() || !worker_sockets.empty())
throw std::logic_error("Internal error: proxy thread started with active worker threads");
worker_sockets.reserve(max_workers);
// Pre-initialize these worker sockets rather than creating during thread initialization so that
// we can't hit the zmq socket limit during worker thread startup.
for (int i = 0; i < max_workers; i++)
worker_sockets.emplace_back(context, zmq::socket_type::dealer);
#ifndef _WIN32
int saved_umask = -1;
if (STARTUP_UMASK >= 0)
saved_umask = umask(STARTUP_UMASK);
#endif
{
zmq::socket_t inproc_listener{context, zmq::socket_type::router};
inproc_listener.bind(SN_ADDR_SELF);
inproc_listener_connid = next_conn_id++;
connections.emplace_hint(connections.end(), inproc_listener_connid, std::move(inproc_listener));
}
for (size_t i = 0; i < bind.size(); i++) {
if (!proxy_bind(bind[i], i)) {
OMQ_LOG(fatal, "OxenMQ failed to listen on ", bind[i].address);
throw zmq::error_t{};
}
}
#ifndef _WIN32
if (saved_umask != -1)
umask(saved_umask);
// set socket gid / uid if it is provided
if (SOCKET_GID != -1 or SOCKET_UID != -1) {
for (auto& b : bind) {
const address addr(b.address);
if (addr.ipc())
if (chown(addr.socket.c_str(), SOCKET_UID, SOCKET_GID) == -1)
throw std::runtime_error("cannot set group on " + addr.socket + ": " + strerror(errno));
}
}
#endif
connections_updated = true;
// Also add an internal connection to self so that calling code can avoid needing to
// special-case rare situations where we are supposed to talk to a quorum member that happens to
// be ourselves (which can happen, for example, with cross-quoum Blink communication)
// FIXME: not working
//listener.bind(SN_ADDR_SELF);
if (!timers)
timers.reset(zmq_timers_new());
if (-1 == zmq_timers_add(timers.get(),
std::chrono::milliseconds{CONN_CHECK_INTERVAL}.count(),
[](int /*timer_id*/, void* self) { static_cast<OxenMQ*>(self)->proxy_conn_cleanup(); },
this)) {
throw zmq::error_t{};
}
// Wait for tagged worker threads to get ready and connect to us (we get a "STARTING" message)
// and send them back a "START" to let them know to go ahead with startup. We need this
// synchronization dance to guarantee that the workers are routable before we can proceed.
if (!tagged_workers.empty()) {
OMQ_LOG(debug, "Waiting for tagged workers");
{
std::unique_lock lock{tagged_startup_mutex};
tagged_go = tagged_go_mode::GO;
}
tagged_cv.notify_all();
std::unordered_set<std::string_view> waiting_on;
for (auto& w : tagged_workers)
waiting_on.emplace(std::get<run_info>(w).worker_routing_id);
for (std::vector<zmq::message_t> parts; !waiting_on.empty(); parts.clear()) {
recv_message_parts(workers_socket, parts);
if (parts.size() != 2 || view(parts[1]) != "STARTING"sv) {
OMQ_LOG(error, "Received invalid message on worker socket while waiting for tagged thread startup");
continue;
}
OMQ_LOG(debug, "Received STARTING message from ", view(parts[0]));
if (auto it = waiting_on.find(view(parts[0])); it != waiting_on.end())
waiting_on.erase(it);
else
OMQ_LOG(error, "Received STARTING message from unknown worker ", view(parts[0]));
}
for (auto&w : tagged_workers) {
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_name, " to finish startup");
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
}
}
}
void OxenMQ::proxy_loop(std::promise<void> startup) {
try {
proxy_loop_init();
} catch (...) {
startup.set_exception(std::current_exception());
return;
}
startup.set_value();
// Fixed array used for worker and control messages: these are never longer than 3 parts:
std::array<zmq::message_t, 3> control_parts;
// General vector for handling incoming messages:
std::vector<zmq::message_t> parts;
std::vector<std::pair<const int64_t, zmq::socket_t>*> queue; // Used as a circular buffer
#ifdef OXENMQ_USE_EPOLL
std::vector<struct epoll_event> evs;
#endif
while (true) {
std::chrono::milliseconds poll_timeout;
if (max_workers == 0) { // Will be 0 only if we are quitting
if (std::none_of(workers.begin(), workers.end(), [](auto &w) { return w.worker_thread.joinable(); }) &&
std::none_of(tagged_workers.begin(), tagged_workers.end(), [](auto &w) { return std::get<0>(w).worker_thread.joinable(); })) {
// All the workers have finished, so we can finish shutting down
return proxy_quit();
}
poll_timeout = 1s; // We don't keep running timers when we're quitting, so don't have a timer to check
} else {
poll_timeout = std::chrono::milliseconds{zmq_timers_timeout(timers.get())};
}
if (connections_updated) {
rebuild_pollitems();
// If we just rebuilt the queue then do a full check of everything, because we might
// have sockets that already edge-triggered that we need to fully drain before we start
// polling.
proxy_skip_one_poll = true;
}
// We round-robin connections when pulling off pending messages one-by-one rather than
// pulling off all messages from one connection before moving to the next; thus in cases of
// contention we end up fairly distributing.
queue.reserve(connections.size() + 1);
#ifdef OXENMQ_USE_EPOLL
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
if (proxy_skip_one_poll) {
proxy_skip_one_poll = false;
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_all = true;
}
else {
OMQ_TRACE("polling for new messages via epoll");
evs.resize(3 + connections.size());
const int max = epoll_wait(epoll_fd, evs.data(), evs.size(), poll_timeout.count());
queue.clear();
for (int i = 0; i < max; i++) {
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
process_command = true;
else if (conn_id == EPOLL_WORKER_ID)
process_worker = true;
else if (conn_id == EPOLL_ZAP_ID)
process_zap = true;
else if (auto it = connections.find(conn_id); it != connections.end())
queue.push_back(&*it);
}
queue.push_back(nullptr);
}
#else
if (proxy_skip_one_poll)
proxy_skip_one_poll = false;
else {
OMQ_TRACE("polling for new messages");
// We poll the control socket and worker socket for any incoming messages. If we have
// available worker room then also poll incoming connections and outgoing connections
// for messages to forward to a worker. Otherwise, we just look for a control message
// or a worker coming back with a ready message.
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
#endif
if (process_command) {
OMQ_TRACE("processing control messages");
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
proxy_control_message(control_parts, len);
}
if (process_worker) {
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
proxy_worker_message(control_parts, len);
}
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
if (process_zap) {
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
}
// See if we can drain anything from the current queue before we potentially add to it
// below.
OMQ_TRACE("processing queued jobs and messages");
proxy_process_queue();
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.clear();
for (auto& id_sock : connections)
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
queue.push_back(&id_sock);
queue.push_back(nullptr);
}
size_t end = queue.size() - 1;
for (size_t pos = 0; pos != end; ++pos %= queue.size()) {
parts.clear();
auto& [id, sock] = *queue[pos];
if (!recv_message_parts(sock, parts, zmq::recv_flags::dontwait))
continue;
// We only pull this one message now but then requeue the socket so that after we check
// all other sockets we come back to this one to check again.
queue[end] = queue[pos];
++end %= queue.size();
if (parts.empty()) {
OMQ_LOG(warn, "Ignoring empty (0-part) incoming message");
continue;
}
if (!proxy_handle_builtin(id, sock, parts))
proxy_to_worker(id, sock, parts);
if (connections_updated) {
// If connections got updated then our points are stale, so restart the proxy loop;
// we'll immediately end up right back here at least once before we resume polling.
OMQ_TRACE("connections became stale; short-circuiting incoming message loop");
break;
}
}
#ifdef OXENMQ_USE_EPOLL
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
// state on another socket, perhaps by writing to it) then we need to repeat the loop
// *without* going back to epoll again, until we get through everything without any
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
// of ZMQ's edge-triggered notifications on zmq fd's.
//
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
if (!connections_updated && !proxy_skip_one_poll) {
for (auto* s : {&command, &workers_socket, &zap_auth}) {
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
if (!proxy_skip_one_poll) {
for (auto& [id, sock] : connections) {
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
}
}
#endif
OMQ_TRACE("done proxy loop");
}
}
static bool is_error_response(std::string_view cmd) {
return cmd == "FORBIDDEN" || cmd == "FORBIDDEN_SN" || cmd == "NOT_A_SERVICE_NODE" || cmd == "UNKNOWNCOMMAND" || cmd == "NO_REPLY_TAG";
}
// Return true if we recognized/handled the builtin command (even if we reject it for whatever
// reason)
bool OxenMQ::proxy_handle_builtin(int64_t conn_id, zmq::socket_t& sock, std::vector<zmq::message_t>& parts) {
// Doubling as a bool and an offset:
size_t incoming = sock.get(zmq::sockopt::type) == ZMQ_ROUTER;
std::string_view route, cmd;
if (parts.size() < 1 + incoming) {
OMQ_LOG(warn, "Received empty message; ignoring");
return true;
}
if (incoming) {
route = view(parts[0]);
cmd = view(parts[1]);
} else {
cmd = view(parts[0]);
}
OMQ_TRACE("Checking for builtins: '", cmd, "' from ", peer_address(parts.back()));
if (cmd == "REPLY") {
size_t tag_pos = 1 + incoming;
if (parts.size() <= tag_pos) {
OMQ_LOG(warn, "Received REPLY without a reply tag; ignoring");
return true;
}
std::string reply_tag{view(parts[tag_pos])};
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
OMQ_LOG(debug, "Received REPLY for pending command ", oxenc::to_hex(reply_tag), "; scheduling callback");
std::vector<std::string> data;
data.reserve(parts.size() - (tag_pos + 1));
for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it)
data.emplace_back(view(*it));
proxy_schedule_reply_job([callback=std::move(it->second.second), data=std::move(data)] {
callback(true, std::move(data));
});
pending_requests.erase(it);
} else {
OMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", oxenc::to_hex(reply_tag), "); ignoring");
}
return true;
} else if (cmd == "HI") {
if (!incoming) {
OMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring");
return true;
}
OMQ_LOG(debug, "Incoming client from ", peer_address(parts.back()), " sent HI, replying with HELLO");
try {
send_routed_message(sock, std::string{route}, "HELLO");
} catch (const std::exception &e) { OMQ_LOG(warn, "Couldn't reply with HELLO: ", e.what()); }
return true;
} else if (cmd == "HELLO") {
if (incoming) {
OMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring");
return true;
}
auto it = std::find_if(pending_connects.begin(), pending_connects.end(),
[&](auto& pc) { return std::get<int64_t>(pc) == conn_id; });
if (it == pending_connects.end()) {
OMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring");
return true;
}
auto& pc = *it;
auto pit = peers.find(std::get<int64_t>(pc));
if (pit == peers.end()) {
OMQ_LOG(warn, "Got invalid 'HELLO' message with invalid conn_id; ignoring");
return true;
}
OMQ_LOG(debug, "Got initial HELLO server response from ", peer_address(parts.back()));
proxy_schedule_reply_job([on_success=std::move(std::get<ConnectSuccess>(pc)),
conn=pit->first] {
on_success(conn);
});
pending_connects.erase(it);
return true;
} else if (cmd == "BYE") {
if (!incoming) {
OMQ_LOG(debug, "BYE command received; disconnecting from ", peer_address(parts.back()));
proxy_close_connection(conn_id, 0s);
} else {
OMQ_LOG(warn, "Got invalid 'BYE' command on an incoming socket; ignoring");
}
return true;
}
else if (is_error_response(cmd)) {
// These messages (FORBIDDEN, UNKNOWNCOMMAND, etc.) are sent in response to us trying to
// invoke something that doesn't exist or we don't have permission to access. These have
// two forms (the latter is only sent by remotes running 1.1.0+).
// - ["XXX", "whatever.command"]
// - ["XXX", "REPLY", replytag]
// (ignoring the routing prefix on incoming commands).
// For the former, we log; for the latter we trigger the reply callback with a failure
if (parts.size() == (1 + incoming) && cmd == "UNKNOWNCOMMAND") {
// pre-1.1.0 sent just a plain UNKNOWNCOMMAND (without the actual command); this was not
// useful, but also this response is *expected* for things 1.0.5 didn't understand, like
// FORBIDDEN_SN: so log it only at debug level and move on.
OMQ_LOG(debug, "Received plain UNKNOWNCOMMAND; remote is probably an older oxenmq. Ignoring.");
return true;
}
if (parts.size() == (3 + incoming) && view(parts[1 + incoming]) == "REPLY") {
std::string reply_tag{view(parts[2 + incoming])};
auto it = pending_requests.find(reply_tag);
if (it != pending_requests.end()) {
OMQ_LOG(debug, "Received ", cmd, " REPLY for pending command ", oxenc::to_hex(reply_tag), "; scheduling failure callback");
proxy_schedule_reply_job([callback=std::move(it->second.second), cmd=std::string{cmd}] {
callback(false, {{std::move(cmd)}});
});
pending_requests.erase(it);
} else {
OMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", oxenc::to_hex(reply_tag), "); ignoring");
}
} else {
OMQ_LOG(warn, "Received ", cmd, ':', (parts.size() > 1 + incoming ? view(parts[1 + incoming]) : "(unknown command)"sv),
" from ", peer_address(parts.back()));
}
return true;
}
return false;
}
void OxenMQ::proxy_process_queue() {
if (max_workers == 0) // shutting down
return;
// First: send any tagged thread tasks to the tagged threads, if idle
for (auto& [run, busy, queue] : tagged_workers) {
if (!busy && !queue.empty()) {
busy = true;
proxy_run_worker(run.load(std::move(queue.front()), false, run.worker_id));
queue.pop_front();
}
}
// Second: process any batch jobs; since these are internal they are given higher priority.
proxy_run_batch_jobs(batch_jobs, batch_jobs_reserved, batch_jobs_active, false);
// Next any reply batch jobs (which are a bit different from the above, since they are
// externally triggered but for things we initiated locally).
proxy_run_batch_jobs(reply_jobs, reply_jobs_reserved, reply_jobs_active, true);
// Finally general incoming commands
for (auto it = pending_commands.begin(); it != pending_commands.end() && active_workers() < max_workers; ) {
auto& pending = *it;
if (pending.cat.active_threads < pending.cat.reserved_threads
|| active_workers() < general_workers) {
proxy_run_worker(get_idle_worker().load(std::move(pending)));
pending.cat.queued--;
pending.cat.active_threads++;
assert(pending.cat.queued >= 0);
it = pending_commands.erase(it);
} else {
++it; // no available general or reserved worker spots for this job right now
}
}
}
}