mirror of
https://github.com/oxen-io/oxen-mq.git
synced 2023-12-13 21:00:31 +01:00
Merge pull request #88 from jagerman/epoll
epoll: always retrieve events from triggered sockets
This commit is contained in:
commit
dc7fb35493
1 changed files with 43 additions and 15 deletions
|
@ -533,10 +533,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
queue.reserve(connections.size() + 1);
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
bool process_control = false, process_worker = false, process_zap = false, process_all = false;
|
||||
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
|
||||
|
||||
if (proxy_skip_one_poll) {
|
||||
proxy_skip_one_poll = false;
|
||||
|
||||
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_all = true;
|
||||
}
|
||||
else {
|
||||
|
@ -549,7 +553,7 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
for (int i = 0; i < max; i++) {
|
||||
const auto conn_id = evs[i].data.u64;
|
||||
if (conn_id == EPOLL_COMMAND_ID)
|
||||
process_control = true;
|
||||
process_command = true;
|
||||
else if (conn_id == EPOLL_WORKER_ID)
|
||||
process_worker = true;
|
||||
else if (conn_id == EPOLL_ZAP_ID)
|
||||
|
@ -573,28 +577,25 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
|
||||
}
|
||||
|
||||
constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true;
|
||||
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
|
||||
#endif
|
||||
|
||||
if (process_control || process_all) {
|
||||
if (process_command) {
|
||||
OMQ_TRACE("processing control messages");
|
||||
// Retrieve any waiting incoming control messages
|
||||
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
|
||||
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
|
||||
proxy_control_message(control_parts, len);
|
||||
}
|
||||
}
|
||||
|
||||
if (process_worker || process_all) {
|
||||
if (process_worker) {
|
||||
OMQ_TRACE("processing worker messages");
|
||||
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
|
||||
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
|
||||
proxy_worker_message(control_parts, len);
|
||||
}
|
||||
}
|
||||
|
||||
OMQ_TRACE("processing timers");
|
||||
zmq_timers_execute(timers.get());
|
||||
|
||||
if (process_zap || process_all) {
|
||||
if (process_zap) {
|
||||
// Handle any zap authentication
|
||||
OMQ_TRACE("processing zap requests");
|
||||
process_zap_requests();
|
||||
|
@ -607,11 +608,11 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
|
||||
OMQ_TRACE("processing new incoming messages");
|
||||
if (process_all) {
|
||||
queue.resize(connections.size() + 1);
|
||||
int i = 0;
|
||||
queue.clear();
|
||||
for (auto& id_sock : connections)
|
||||
queue[i++] = &id_sock;
|
||||
queue[i] = nullptr;
|
||||
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
|
||||
queue.push_back(&id_sock);
|
||||
queue.push_back(nullptr);
|
||||
}
|
||||
|
||||
size_t end = queue.size() - 1;
|
||||
|
@ -644,6 +645,33 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
}
|
||||
}
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
|
||||
// state on another socket, perhaps by writing to it) then we need to repeat the loop
|
||||
// *without* going back to epoll again, until we get through everything without any
|
||||
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
|
||||
// of ZMQ's edge-triggered notifications on zmq fd's.
|
||||
//
|
||||
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
|
||||
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
||||
if (!connections_updated && !proxy_skip_one_poll) {
|
||||
for (auto* s : {&command, &workers_socket, &zap_auth}) {
|
||||
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
|
||||
proxy_skip_one_poll = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!proxy_skip_one_poll) {
|
||||
for (auto& [id, sock] : connections) {
|
||||
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
|
||||
proxy_skip_one_poll = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
OMQ_TRACE("done proxy loop");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue