epoll: fix hang on heavily loaded sockets

This fixes a hang in the epoll code that triggers on heavy, bursty
connections (such as the live SPNS APNs notifier).

It turns out that side-effects of processing our sockets could leave
other sockets (that we processed earlier in the loop) in a
needs-attention state which we might not notice if we go back to
epoll_wait right away.  zmq::poll apparently takes care of this (and so
is safe to re-poll even in this state), but when we are using epoll we
need to worry about it by always checking for zmq events (which itself
has side effects) and, if we get any, re-enter the loop body immediately
*without* polling to deal with them.
This commit is contained in:
Jason Rhinelander 2023-09-15 15:51:04 -03:00
parent fd58ab9cac
commit caadd35052
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
1 changed files with 43 additions and 15 deletions

View File

@ -533,10 +533,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
queue.reserve(connections.size() + 1);
#ifdef OXENMQ_USE_EPOLL
bool process_control = false, process_worker = false, process_zap = false, process_all = false;
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
if (proxy_skip_one_poll) {
proxy_skip_one_poll = false;
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_all = true;
}
else {
@ -549,7 +553,7 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
for (int i = 0; i < max; i++) {
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
process_control = true;
process_command = true;
else if (conn_id == EPOLL_WORKER_ID)
process_worker = true;
else if (conn_id == EPOLL_ZAP_ID)
@ -573,28 +577,25 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true;
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
#endif
if (process_control || process_all) {
if (process_command) {
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
proxy_control_message(control_parts, len);
}
}
if (process_worker || process_all) {
if (process_worker) {
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
proxy_worker_message(control_parts, len);
}
}
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
if (process_zap || process_all) {
if (process_zap) {
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
@ -607,11 +608,11 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.resize(connections.size() + 1);
int i = 0;
queue.clear();
for (auto& id_sock : connections)
queue[i++] = &id_sock;
queue[i] = nullptr;
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
queue.push_back(&id_sock);
queue.push_back(nullptr);
}
size_t end = queue.size() - 1;
@ -644,6 +645,33 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
}
}
#ifdef OXENMQ_USE_EPOLL
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
// state on another socket, perhaps by writing to it) then we need to repeat the loop
// *without* going back to epoll again, until we get through everything without any
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
// of ZMQ's edge-triggered notifications on zmq fd's.
//
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
if (!connections_updated && !proxy_skip_one_poll) {
for (auto* s : {&command, &workers_socket, &zap_auth}) {
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
if (!proxy_skip_one_poll) {
for (auto& [id, sock] : connections) {
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
}
}
#endif
OMQ_TRACE("done proxy loop");
}
}