mirror of https://github.com/oxen-io/oxen-mq.git
Use fixed vector for idle workers
Use a count + fixed size vector with a separate variable tracking the size seems to perform slightly better than popping/pushing the vector.
This commit is contained in:
parent
45791d3a19
commit
d86ecb3a70
|
@ -456,8 +456,9 @@ private:
|
|||
/// Router socket to reach internal worker threads from proxy
|
||||
zmq::socket_t workers_socket{context, zmq::socket_type::router};
|
||||
|
||||
/// indices of idle, active workers
|
||||
/// indices of idle, active workers; note that this vector is usually oversized
|
||||
std::vector<unsigned int> idle_workers;
|
||||
size_t idle_worker_count = 0; // Actual # elements of idle_workers in use
|
||||
|
||||
/// Maximum number of general task workers, specified by set_general_threads()
|
||||
int general_workers = std::max<int>(1, std::thread::hardware_concurrency());
|
||||
|
@ -469,7 +470,7 @@ private:
|
|||
int max_workers;
|
||||
|
||||
/// Number of active workers
|
||||
int active_workers() const { return workers.size() - idle_workers.size(); }
|
||||
int active_workers() const { return workers.size() - idle_worker_count; }
|
||||
|
||||
/// Worker thread loop. Tagged and start are provided for a tagged worker thread.
|
||||
void worker_thread(unsigned int index, std::optional<std::string> tagged = std::nullopt, std::function<void()> start = nullptr);
|
||||
|
|
|
@ -325,9 +325,9 @@ void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t
|
|||
// close workers as they come back to READY status, and then close external
|
||||
// connections once all workers are done.
|
||||
max_workers = 0;
|
||||
for (const auto &route : idle_workers)
|
||||
route_control(workers_socket, workers[route].worker_routing_id, "QUIT");
|
||||
idle_workers.clear();
|
||||
for (size_t i = 0; i < idle_worker_count; i++)
|
||||
route_control(workers_socket, workers[idle_workers[i]].worker_routing_id, "QUIT");
|
||||
idle_worker_count = 0;
|
||||
for (auto& [run, busy, queue] : tagged_workers)
|
||||
if (!busy)
|
||||
route_control(workers_socket, run.worker_routing_id, "QUIT");
|
||||
|
@ -404,6 +404,7 @@ void OxenMQ::proxy_loop_init() {
|
|||
}
|
||||
|
||||
workers.reserve(max_workers);
|
||||
idle_workers.resize(max_workers);
|
||||
if (!workers.empty())
|
||||
throw std::logic_error("Internal error: proxy thread started with active worker threads");
|
||||
|
||||
|
|
|
@ -162,9 +162,8 @@ void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged
|
|||
|
||||
|
||||
OxenMQ::run_info& OxenMQ::get_idle_worker() {
|
||||
if (idle_workers.empty()) {
|
||||
size_t id = workers.size();
|
||||
assert(workers.capacity() > id);
|
||||
if (idle_worker_count == 0) {
|
||||
uint32_t id = workers.size();
|
||||
workers.emplace_back();
|
||||
auto& r = workers.back();
|
||||
r.worker_id = id;
|
||||
|
@ -172,8 +171,7 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() {
|
|||
r.worker_routing_name = "w" + std::to_string(id);
|
||||
return r;
|
||||
}
|
||||
size_t id = idle_workers.back();
|
||||
idle_workers.pop_back();
|
||||
size_t id = idle_workers[--idle_worker_count];
|
||||
return workers[id];
|
||||
}
|
||||
|
||||
|
@ -259,7 +257,7 @@ void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t l
|
|||
OMQ_TRACE("Telling worker ", route, " to quit");
|
||||
route_control(workers_socket, route, "QUIT");
|
||||
} else if (!tagged_worker) {
|
||||
idle_workers.push_back(worker_id);
|
||||
idle_workers[idle_worker_count++] = worker_id;
|
||||
}
|
||||
} else if (cmd == "QUITTING"sv) {
|
||||
run.worker_thread.join();
|
||||
|
|
Loading…
Reference in New Issue