Compare commits

...

8 Commits

Author SHA1 Message Date
Jason Rhinelander 2bee44f025
1.2.16 release 2023-09-28 11:15:13 -03:00
Jason Rhinelander 0ef4ff1c91
fix pre version numbers 2023-09-28 11:13:44 -03:00
Jason Rhinelander f15948c51b
Merge remote-tracking branch 'origin/stable' into debian/bookworm 2023-09-28 11:11:04 -03:00
Jason Rhinelander 0d947b220b
Rediff patches
Drop 0002-epoll-always-retrieve-events-from-triggered-sockets.patch: <REASON>
2023-09-28 11:10:34 -03:00
Jason Rhinelander 5b8597d308
Merge remote-tracking branch 'origin/dev' into stable 2023-09-28 11:07:54 -03:00
Jason Rhinelander dc7fb35493
Merge pull request #88 from jagerman/epoll
epoll: always retrieve events from triggered sockets
2023-09-16 12:24:57 -03:00
Jason Rhinelander caadd35052
epoll: fix hang on heavily loaded sockets
This fixes a hang in the epoll code that triggers on heavy, bursty
connections (such as the live SPNS APNs notifier).

It turns out that side-effects of processing our sockets could leave
other sockets (that we processed earlier in the loop) in a
needs-attention state which we might not notice if we go back to
epoll_wait right away.  zmq::poll apparently takes care of this (and so
is safe to re-poll even in this state), but when we are using epoll we
need to worry about it by always checking for zmq events (which itself
has side effects) and, if we get any, re-enter the loop body immediately
*without* polling to deal with them.
2023-09-15 18:29:23 -03:00
Jason Rhinelander 4f6dc35ea1
Merge remote-tracking branch 'origin/dev' into stable 2023-07-17 13:53:17 -03:00
4 changed files with 51 additions and 77 deletions

10
debian/changelog vendored
View File

@ -1,10 +1,16 @@
oxenmq (1.2.16-0pre2-1~deb12) bookworm; urgency=medium
oxenmq (1.2.16-1~deb12) bookworm; urgency=medium
* 1.2.16 release
-- Jason Rhinelander <jason@imaginary.ca> Thu, 28 Sep 2023 11:15:09 -0300
oxenmq (1.2.16~pre2-1~deb12) bookworm; urgency=medium
* epoll: always retrieve events from triggered sockets
-- Jason Rhinelander <jason@imaginary.ca> Fri, 15 Sep 2023 16:12:20 -0300
oxenmq (1.2.16-0pre1-1~deb12) bookworm; urgency=medium
oxenmq (1.2.16~pre1-1~deb12) bookworm; urgency=medium
* Drop long-deprecated liblokimq-dev package
* Redo random string generation

View File

@ -1,59 +0,0 @@
From: Jason Rhinelander <jason@imaginary.ca>
Date: Fri, 15 Sep 2023 15:51:04 -0300
Subject: epoll: always retrieve events from triggered sockets
The epoll approach sometimes hangs sockets: apparently the call to
getting the ZMQ events off a socket has internal ZMQ side effects (see
libzmq issue 3641 for more), so make sure we always call it on triggered
sockets when using epoll.
---
oxenmq/proxy.cpp | 24 ++++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp
index 8d4df9d..ef9cee2 100644
--- a/oxenmq/proxy.cpp
+++ b/oxenmq/proxy.cpp
@@ -547,15 +547,20 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
queue.clear();
for (int i = 0; i < max; i++) {
+ // Querying the zmq events here is required as it has side effects of processing
+ // things on the socket that isn't done by just trying to read off the socket below;
+ // see https://github.com/zeromq/libzmq/issues/3641 for details.
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
- process_control = true;
+ process_control = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
else if (conn_id == EPOLL_WORKER_ID)
- process_worker = true;
+ process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
else if (conn_id == EPOLL_ZAP_ID)
- process_zap = true;
- else if (auto it = connections.find(conn_id); it != connections.end())
- queue.push_back(&*it);
+ process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
+ else if (auto it = connections.find(conn_id); it != connections.end()) {
+ if (it->second.get(zmq::sockopt::events) & ZMQ_POLLIN)
+ queue.push_back(&*it);
+ }
}
queue.push_back(nullptr);
}
@@ -608,10 +613,13 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.resize(connections.size() + 1);
- int i = 0;
+ size_t i = 0;
for (auto& id_sock : connections)
- queue[i++] = &id_sock;
- queue[i] = nullptr;
+ if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
+ queue[i++] = &id_sock;
+ queue[i++] = nullptr;
+ if (queue.size() > i)
+ queue.resize(i);
}
size_t end = queue.size() - 1;

View File

@ -1,2 +1 @@
0002-Fully-version-library.patch
0002-epoll-always-retrieve-events-from-triggered-sockets.patch

View File

@ -533,10 +533,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
queue.reserve(connections.size() + 1);
#ifdef OXENMQ_USE_EPOLL
bool process_control = false, process_worker = false, process_zap = false, process_all = false;
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
if (proxy_skip_one_poll) {
proxy_skip_one_poll = false;
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
process_all = true;
}
else {
@ -549,7 +553,7 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
for (int i = 0; i < max; i++) {
const auto conn_id = evs[i].data.u64;
if (conn_id == EPOLL_COMMAND_ID)
process_control = true;
process_command = true;
else if (conn_id == EPOLL_WORKER_ID)
process_worker = true;
else if (conn_id == EPOLL_ZAP_ID)
@ -573,28 +577,25 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
}
constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true;
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
#endif
if (process_control || process_all) {
if (process_command) {
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
proxy_control_message(control_parts, len);
}
}
if (process_worker || process_all) {
if (process_worker) {
OMQ_TRACE("processing worker messages");
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
proxy_worker_message(control_parts, len);
}
}
OMQ_TRACE("processing timers");
zmq_timers_execute(timers.get());
if (process_zap || process_all) {
if (process_zap) {
// Handle any zap authentication
OMQ_TRACE("processing zap requests");
process_zap_requests();
@ -607,11 +608,11 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
OMQ_TRACE("processing new incoming messages");
if (process_all) {
queue.resize(connections.size() + 1);
int i = 0;
queue.clear();
for (auto& id_sock : connections)
queue[i++] = &id_sock;
queue[i] = nullptr;
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
queue.push_back(&id_sock);
queue.push_back(nullptr);
}
size_t end = queue.size() - 1;
@ -644,6 +645,33 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
}
}
#ifdef OXENMQ_USE_EPOLL
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
// state on another socket, perhaps by writing to it) then we need to repeat the loop
// *without* going back to epoll again, until we get through everything without any
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
// of ZMQ's edge-triggered notifications on zmq fd's.
//
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
if (!connections_updated && !proxy_skip_one_poll) {
for (auto* s : {&command, &workers_socket, &zap_auth}) {
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
if (!proxy_skip_one_poll) {
for (auto& [id, sock] : connections) {
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
proxy_skip_one_poll = true;
break;
}
}
}
}
#endif
OMQ_TRACE("done proxy loop");
}
}