mirror of https://github.com/oxen-io/oxen-mq.git
Compare commits
8 Commits
d3f9484fdd
...
095fe74972
Author | SHA1 | Date |
---|---|---|
Jason Rhinelander | 095fe74972 | |
Jason Rhinelander | 03ff5bde83 | |
Jason Rhinelander | 2c844214eb | |
Jason Rhinelander | 7644c9ae7c | |
Jason Rhinelander | 5b8597d308 | |
Jason Rhinelander | dc7fb35493 | |
Jason Rhinelander | caadd35052 | |
Jason Rhinelander | 4f6dc35ea1 |
|
@ -1,10 +1,16 @@
|
|||
oxenmq (1.2.16-0pre2-1~deb13) trixie; urgency=medium
|
||||
oxenmq (1.2.16-1~deb13) trixie; urgency=medium
|
||||
|
||||
* 1.2.16 release
|
||||
|
||||
-- Jason Rhinelander <jason@imaginary.ca> Thu, 28 Sep 2023 11:15:14 -0300
|
||||
|
||||
oxenmq (1.2.16~0pre2-1~deb13) trixie; urgency=medium
|
||||
|
||||
* trixie deb
|
||||
|
||||
-- Jason Rhinelander <jason@imaginary.ca> Wed, 27 Sep 2023 20:00:11 -0300
|
||||
|
||||
oxenmq (1.2.16-0pre2-1) unstable; urgency=medium
|
||||
oxenmq (1.2.16~0pre2-1) unstable; urgency=medium
|
||||
|
||||
* epoll: always retrieve events from triggered sockets
|
||||
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
From: Jason Rhinelander <jason@imaginary.ca>
|
||||
Date: Fri, 15 Sep 2023 15:51:04 -0300
|
||||
Subject: epoll: always retrieve events from triggered sockets
|
||||
|
||||
The epoll approach sometimes hangs sockets: apparently the call to
|
||||
getting the ZMQ events off a socket has internal ZMQ side effects (see
|
||||
libzmq issue 3641 for more), so make sure we always call it on triggered
|
||||
sockets when using epoll.
|
||||
---
|
||||
oxenmq/proxy.cpp | 24 ++++++++++++++++--------
|
||||
1 file changed, 16 insertions(+), 8 deletions(-)
|
||||
|
||||
diff --git a/oxenmq/proxy.cpp b/oxenmq/proxy.cpp
|
||||
index 8d4df9d..ef9cee2 100644
|
||||
--- a/oxenmq/proxy.cpp
|
||||
+++ b/oxenmq/proxy.cpp
|
||||
@@ -547,15 +547,20 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
||||
|
||||
queue.clear();
|
||||
for (int i = 0; i < max; i++) {
|
||||
+ // Querying the zmq events here is required as it has side effects of processing
|
||||
+ // things on the socket that isn't done by just trying to read off the socket below;
|
||||
+ // see https://github.com/zeromq/libzmq/issues/3641 for details.
|
||||
const auto conn_id = evs[i].data.u64;
|
||||
if (conn_id == EPOLL_COMMAND_ID)
|
||||
- process_control = true;
|
||||
+ process_control = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
else if (conn_id == EPOLL_WORKER_ID)
|
||||
- process_worker = true;
|
||||
+ process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
else if (conn_id == EPOLL_ZAP_ID)
|
||||
- process_zap = true;
|
||||
- else if (auto it = connections.find(conn_id); it != connections.end())
|
||||
- queue.push_back(&*it);
|
||||
+ process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
+ else if (auto it = connections.find(conn_id); it != connections.end()) {
|
||||
+ if (it->second.get(zmq::sockopt::events) & ZMQ_POLLIN)
|
||||
+ queue.push_back(&*it);
|
||||
+ }
|
||||
}
|
||||
queue.push_back(nullptr);
|
||||
}
|
||||
@@ -608,10 +613,13 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
||||
OMQ_TRACE("processing new incoming messages");
|
||||
if (process_all) {
|
||||
queue.resize(connections.size() + 1);
|
||||
- int i = 0;
|
||||
+ size_t i = 0;
|
||||
for (auto& id_sock : connections)
|
||||
- queue[i++] = &id_sock;
|
||||
- queue[i] = nullptr;
|
||||
+ if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
|
||||
+ queue[i++] = &id_sock;
|
||||
+ queue[i++] = nullptr;
|
||||
+ if (queue.size() > i)
|
||||
+ queue.resize(i);
|
||||
}
|
||||
|
||||
size_t end = queue.size() - 1;
|
|
@ -1,2 +1 @@
|
|||
0002-Fully-version-library.patch
|
||||
0002-epoll-always-retrieve-events-from-triggered-sockets.patch
|
||||
|
|
|
@ -533,10 +533,14 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
queue.reserve(connections.size() + 1);
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
bool process_control = false, process_worker = false, process_zap = false, process_all = false;
|
||||
bool process_command = false, process_worker = false, process_zap = false, process_all = false;
|
||||
|
||||
if (proxy_skip_one_poll) {
|
||||
proxy_skip_one_poll = false;
|
||||
|
||||
process_command = command.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_worker = workers_socket.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_zap = zap_auth.get(zmq::sockopt::events) & ZMQ_POLLIN;
|
||||
process_all = true;
|
||||
}
|
||||
else {
|
||||
|
@ -549,7 +553,7 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
for (int i = 0; i < max; i++) {
|
||||
const auto conn_id = evs[i].data.u64;
|
||||
if (conn_id == EPOLL_COMMAND_ID)
|
||||
process_control = true;
|
||||
process_command = true;
|
||||
else if (conn_id == EPOLL_WORKER_ID)
|
||||
process_worker = true;
|
||||
else if (conn_id == EPOLL_ZAP_ID)
|
||||
|
@ -573,28 +577,25 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
zmq::poll(pollitems.data(), pollitems.size(), poll_timeout);
|
||||
}
|
||||
|
||||
constexpr bool process_control = true, process_worker = true, process_zap = true, process_all = true;
|
||||
constexpr bool process_command = true, process_worker = true, process_zap = true, process_all = true;
|
||||
#endif
|
||||
|
||||
if (process_control || process_all) {
|
||||
if (process_command) {
|
||||
OMQ_TRACE("processing control messages");
|
||||
// Retrieve any waiting incoming control messages
|
||||
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
|
||||
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait))
|
||||
proxy_control_message(control_parts, len);
|
||||
}
|
||||
}
|
||||
|
||||
if (process_worker || process_all) {
|
||||
if (process_worker) {
|
||||
OMQ_TRACE("processing worker messages");
|
||||
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
|
||||
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait))
|
||||
proxy_worker_message(control_parts, len);
|
||||
}
|
||||
}
|
||||
|
||||
OMQ_TRACE("processing timers");
|
||||
zmq_timers_execute(timers.get());
|
||||
|
||||
if (process_zap || process_all) {
|
||||
if (process_zap) {
|
||||
// Handle any zap authentication
|
||||
OMQ_TRACE("processing zap requests");
|
||||
process_zap_requests();
|
||||
|
@ -607,11 +608,11 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
|
||||
OMQ_TRACE("processing new incoming messages");
|
||||
if (process_all) {
|
||||
queue.resize(connections.size() + 1);
|
||||
int i = 0;
|
||||
queue.clear();
|
||||
for (auto& id_sock : connections)
|
||||
queue[i++] = &id_sock;
|
||||
queue[i] = nullptr;
|
||||
if (id_sock.second.get(zmq::sockopt::events) & ZMQ_POLLIN)
|
||||
queue.push_back(&id_sock);
|
||||
queue.push_back(nullptr);
|
||||
}
|
||||
|
||||
size_t end = queue.size() - 1;
|
||||
|
@ -644,6 +645,33 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
|
|||
}
|
||||
}
|
||||
|
||||
#ifdef OXENMQ_USE_EPOLL
|
||||
// If any socket still has ZMQ_POLLIN (which is possible if something we did above changed
|
||||
// state on another socket, perhaps by writing to it) then we need to repeat the loop
|
||||
// *without* going back to epoll again, until we get through everything without any
|
||||
// ZMQ_POLLIN sockets. If we didn't, we could miss it and might end up deadlocked because
|
||||
// of ZMQ's edge-triggered notifications on zmq fd's.
|
||||
//
|
||||
// More info on the complexities here at https://github.com/zeromq/libzmq/issues/3641 and
|
||||
// https://funcptr.net/2012/09/10/zeromq---edge-triggered-notification/
|
||||
if (!connections_updated && !proxy_skip_one_poll) {
|
||||
for (auto* s : {&command, &workers_socket, &zap_auth}) {
|
||||
if (s->get(zmq::sockopt::events) & ZMQ_POLLIN) {
|
||||
proxy_skip_one_poll = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!proxy_skip_one_poll) {
|
||||
for (auto& [id, sock] : connections) {
|
||||
if (sock.get(zmq::sockopt::events) & ZMQ_POLLIN) {
|
||||
proxy_skip_one_poll = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
OMQ_TRACE("done proxy loop");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue