Merge remote-tracking branch 'origin/stable' into debian/trixie

This commit is contained in:
Jason Rhinelander 2023-09-28 11:11:06 -03:00
commit 7644c9ae7c
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
1 changed files with 43 additions and 15 deletions

View File

@ -533,10 +533,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
queue.reserve(connections.size() + 1);
#ifdef OXENMQ_USE_EPOLL
bool process_control = false, process_worker = false, process_zap = false, process_all = false;
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
if (proxy_skip_one_poll) {
proxy_skip_one_poll = false;
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_all = true;
}
else {
@ -549,7 +553,7 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
for (int i = 0; i < max; i++) {
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
process_control = true;
process_command = true;
else if (conn_id == EPOLL_WORKER_ID)
process_worker = true;
else if (conn_id == EPOLL_ZAP_ID)
@ -573,28 +577,25 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true;
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
#endif
if (process_control || process_all) {
if (process_command) {
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
proxy_control_message(control_parts, len);
}
}
if (process_worker || process_all) {
if (process_worker) {
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
proxy_worker_message(control_parts, len);
}
}
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
if (process_zap || process_all) {
if (process_zap) {
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
@ -607,11 +608,11 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.resize(connections.size() + 1);
int i = 0;
queue.clear();
for (auto& id_sock : connections)
queue[i++] = &id_sock;
queue[i] = nullptr;
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
queue.push_back(&id_sock);
queue.push_back(nullptr);
}
size_t end = queue.size() - 1;
@ -644,6 +645,33 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
}
}
#ifdef OXENMQ_USE_EPOLL
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
// state on another socket, perhaps by writing to it) then we need to repeat the loop
// *without* going back to epoll again, until we get through everything without any
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
// of ZMQ's edge-triggered notifications on zmq fd's.
//
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
if (!connections_updated && !proxy_skip_one_poll) {
for (auto* s : {&command, &workers_socket, &zap_auth}) {
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
if (!proxy_skip_one_poll) {
for (auto& [id, sock] : connections) {
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
}
}
#endif
OMQ_TRACE("done proxy loop");
}
}