mirror of https://github.com/oxen-io/oxen-mq.git
Use raw index bytes in worker router
Change the internal worker routing id to be "w" followed by the raw integer bytes, so that we can just memcpy them into a uint32_t rather than needing to do str -> integer conversion on each received worker message. (This also eliminates a vestigal call into oxenc internals).
This commit is contained in:
parent
fa6de369b2
commit
b8e4eb148f
|
@ -170,8 +170,9 @@ TaggedThreadID OxenMQ::add_tagged_thread(std::string name, std::function<void()>
|
|||
auto& [run, busy, queue] = tagged_workers.emplace_back();
|
||||
busy = false;
|
||||
run.worker_id = tagged_workers.size(); // We want index + 1 (b/c 0 is used for non-tagged jobs)
|
||||
run.worker_routing_id = "t" + std::to_string(run.worker_id);
|
||||
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_id);
|
||||
run.worker_routing_name = "t" + std::to_string(run.worker_id);
|
||||
run.worker_routing_id = "t" + std::string{reinterpret_cast<const char*>(&run.worker_id), sizeof(run.worker_id)};
|
||||
OMQ_TRACE("Created new tagged thread ", name, " with routing id ", run.worker_routing_name);
|
||||
|
||||
run.worker_thread = std::thread{&OxenMQ::worker_thread, this, run.worker_id, name, std::move(start)};
|
||||
|
||||
|
|
|
@ -744,8 +744,9 @@ private:
|
|||
|
||||
// These belong to the proxy thread and must not be accessed by a worker:
|
||||
std::thread worker_thread;
|
||||
size_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
|
||||
std::string worker_routing_id; // "w123" where 123 == worker_id; "n123" for tagged threads.
|
||||
uint32_t worker_id; // The index in `workers` (0-n) or index+1 in `tagged_workers` (1-n)
|
||||
std::string worker_routing_id; // "wXXXX" where XXXX is the raw bytes of worker_id, or tXXXX for tagged threads.
|
||||
std::string worker_routing_name; // "w123" or "t123" -- human readable version of worker_routing_id
|
||||
|
||||
/// Loads the run info with an incoming command
|
||||
run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote,
|
||||
|
|
|
@ -482,7 +482,7 @@ void OxenMQ::proxy_loop_init() {
|
|||
}
|
||||
|
||||
for (auto&w : tagged_workers) {
|
||||
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_id, " to finish startup");
|
||||
OMQ_LOG(debug, "Telling tagged thread worker ", std::get<run_info>(w).worker_routing_name, " to finish startup");
|
||||
route_control(workers_socket, std::get<run_info>(w).worker_routing_id, "START");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,10 +48,11 @@ bool worker_wait_for(OxenMQ& omq, zmq::socket_t& sock, std::vector<zmq::message_
|
|||
}
|
||||
|
||||
void OxenMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
|
||||
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
|
||||
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
|
||||
std::string routing_id = (tagged ? "t" : "w") +
|
||||
std::string(reinterpret_cast<const char*>(&index), sizeof(index)); // for routing
|
||||
std::string worker_id{tagged ? *tagged : "w" + std::to_string(index)}; // for debug
|
||||
|
||||
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + routing_id);
|
||||
[[maybe_unused]] std::string thread_name = tagged.value_or("omq-" + worker_id);
|
||||
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
|
||||
if (thread_name.size() > 15) thread_name.resize(15);
|
||||
pthread_setname_np(pthread_self(), thread_name.c_str());
|
||||
|
@ -167,7 +168,8 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() {
|
|||
workers.emplace_back();
|
||||
auto& r = workers.back();
|
||||
r.worker_id = id;
|
||||
r.worker_routing_id = "w" + std::to_string(id);
|
||||
r.worker_routing_id = "w" + std::string(reinterpret_cast<const char*>(&id), sizeof(id));
|
||||
r.worker_routing_name = "w" + std::to_string(id);
|
||||
return r;
|
||||
}
|
||||
size_t id = idle_workers.back();
|
||||
|
@ -182,18 +184,17 @@ void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
|
|||
return;
|
||||
}
|
||||
auto route = view(parts[0]), cmd = view(parts[1]);
|
||||
OMQ_TRACE("worker message from ", route);
|
||||
assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9');
|
||||
if (route.size() != 5 || (route[0] != 'w' && route[0] != 't')) {
|
||||
OMQ_LOG(error, "Received malformed worker id in worker message; unable to process worker command");
|
||||
return;
|
||||
}
|
||||
bool tagged_worker = route[0] == 't';
|
||||
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t")
|
||||
unsigned int worker_id = oxenc::detail::extract_unsigned(worker_id_str);
|
||||
if (!worker_id_str.empty() /* didn't consume everything */ ||
|
||||
(tagged_worker
|
||||
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
|
||||
: worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1
|
||||
)
|
||||
) {
|
||||
OMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
|
||||
uint32_t worker_id;
|
||||
std::memcpy(&worker_id, route.data() + 1, 4);
|
||||
if (tagged_worker
|
||||
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
|
||||
: worker_id >= workers.size()) { // regular worker ids are indexed from 0 to N-1
|
||||
OMQ_LOG(error, "Received invalid worker id w" + std::to_string(worker_id) + " in worker message; unable to process worker command");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -380,7 +381,7 @@ void OxenMQ::proxy_to_worker(int64_t conn_id, zmq::socket_t& sock, std::vector<z
|
|||
peer->activity(); // outgoing connection activity, pump the activity timer
|
||||
|
||||
OMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
|
||||
" to worker ", run.worker_routing_id);
|
||||
" to worker ", run.worker_routing_name);
|
||||
|
||||
proxy_run_worker(run);
|
||||
category.active_threads++;
|
||||
|
@ -411,7 +412,7 @@ void OxenMQ::proxy_inject_task(injected_task task) {
|
|||
}
|
||||
|
||||
auto& run = get_idle_worker();
|
||||
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id);
|
||||
OMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_name);
|
||||
run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback));
|
||||
|
||||
proxy_run_worker(run);
|
||||
|
|
Loading…
Reference in New Issue