Move socket holding into LokiMQ instance

The thread_local `std::map` here can end up being destructed *before*
the LokiMQ instance (if both are being destroyed during thread joining),
in which case we segfault by trying to use the map.  Move the owning
container into the LokiMQ instead (indexed by the thread) to prevent
that.

Also cleans this code up by:

- Don't close control sockets from the proxy thread; socket_t's aren't
necessarily thread safe so this could be causing issues where we trouble
double-closing or using a closed socket.

- We can just let them get closed during destruction of the LokiMQ.

- Avoid needing shared_ptr's; instead we can just use a unique pointer
with raw pointers in the thread_local cache.  This simplifies closing
because all closing will happen during the LokiMQ destruction.
This commit is contained in:
Jason Rhinelander 2020-11-13 15:12:25 -04:00
parent d889f308ae
commit 253f1ee66e
3 changed files with 19 additions and 28 deletions

View File

@ -4,6 +4,7 @@
#include <map>
#include <random>
#include <ostream>
#include <thread>
extern "C" {
#include <sodium/core.h>
@ -162,33 +163,28 @@ std::atomic<int> next_id{1};
zmq::socket_t& LokiMQ::get_control_socket() {
assert(proxy_thread.joinable());
// Maps the LokiMQ unique ID to a local thread command socket.
static thread_local std::map<int, std::shared_ptr<zmq::socket_t>> control_sockets;
static thread_local std::pair<int, std::shared_ptr<zmq::socket_t>> last{-1, nullptr};
// Optimize by caching the last value; LokiMQ is often a singleton and in that case we're
// going to *always* hit this optimization. Even if it isn't, we're probably likely to need the
// same control socket from the same thread multiple times sequentially so this may still help.
if (object_id == last.first)
return *last.second;
auto it = control_sockets.find(object_id);
if (it != control_sockets.end()) {
last = *it;
return *last.second;
}
static thread_local int last_id = -1;
static thread_local zmq::socket_t* last_socket = nullptr;
if (object_id == last_id)
return *last_socket;
std::lock_guard lock{control_sockets_mutex};
if (proxy_shutting_down)
throw std::runtime_error("Unable to obtain LokiMQ control socket: proxy thread is shutting down");
auto control = std::make_shared<zmq::socket_t>(context, zmq::socket_type::dealer);
control->set(zmq::sockopt::linger, 0);
control->connect(SN_ADDR_COMMAND);
thread_control_sockets.push_back(control);
control_sockets.emplace(object_id, control);
last.first = object_id;
last.second = std::move(control);
return *last.second;
auto& socket = control_sockets[std::this_thread::get_id()];
if (!socket) {
socket = std::make_unique<zmq::socket_t>(context, zmq::socket_type::dealer);
socket->set(zmq::sockopt::linger, 0);
socket->connect(SN_ADDR_COMMAND);
}
last_id = object_id;
last_socket = socket.get();
return *last_socket;
}

View File

@ -131,18 +131,15 @@ private:
/// We have one seldom-used mutex here: it is generally locked just once per thread (the first
/// time the thread calls get_control_socket()) and once more by the proxy thread when it shuts
/// down, and so will not be a contention point.
/// down.
std::mutex control_sockets_mutex;
/// Called to obtain a "command" socket that attaches to `control` to send commands to the
/// proxy thread from other threads. This socket is unique per thread and LokiMQ instance.
zmq::socket_t& get_control_socket();
/// Stores all of the sockets created in different threads via `get_control_socket`. This is
/// only used during destruction to close all of those open sockets, and is protected by an
/// internal mutex which is only locked by new threads getting a control socket and the
/// destructor.
std::vector<std::shared_ptr<zmq::socket_t>> thread_control_sockets;
/// Per-thread control sockets used by lokimq threads to talk to this object's proxy thread.
std::unordered_map<std::thread::id, std::unique_ptr<zmq::socket_t>> control_sockets;
public:

View File

@ -29,8 +29,6 @@ void LokiMQ::proxy_quit() {
command.close();
{
std::lock_guard lock{control_sockets_mutex};
for (auto &control : thread_control_sockets)
control->close();
proxy_shutting_down = true; // To prevent threads from opening new control sockets
}
workers_socket.close();