Use fixed array for known-small internal messages

Internal messages (control messages, worker messages) are always 3 parts
or less, so we can optimize by using a stack allocated std::array for
those cases rather than needing to continually clear and expand a heap
allocated vector.
This commit is contained in:
Jason Rhinelander 2022-05-12 12:42:08 -03:00
parent b8e4eb148f
commit 45791d3a19
No known key found for this signature in database
GPG Key ID: C4992CE7A88D4262
4 changed files with 35 additions and 14 deletions

View File

@ -86,6 +86,21 @@ inline bool recv_message_parts(zmq::socket_t& sock, std::vector<zmq::message_t>&
return true;
}
// Same as above, but using a fixed sized array; this is only used for internal jobs (e.g. control
// messages) where we know the message parts should never exceed a given size (this function does
// not bounds check except in debug builds). Returns the number of message parts received, or 0 on
// read error.
template <size_t N>
inline size_t recv_message_parts(zmq::socket_t& sock, std::array<zmq::message_t, N>& parts, const zmq::recv_flags flags = zmq::recv_flags::none) {
for (size_t count = 0; ; count++) {
assert(count < N);
if (!sock.recv(parts[count], flags))
return 0;
if (!parts[count].more())
return count + 1;
}
}
inline const char* peer_address(zmq::message_t& msg) {
try { return msg.gets("Peer-Address"); } catch (...) {}
return "(unknown)";

View File

@ -484,7 +484,9 @@ private:
void proxy_conn_cleanup();
void proxy_worker_message(std::vector<zmq::message_t>& parts);
using control_message_array = std::array<zmq::message_t, 3>;
void proxy_worker_message(control_message_array& parts, size_t len);
void proxy_process_queue();
@ -608,7 +610,7 @@ private:
void process_zap_requests();
/// Handles a control message from some outer thread to the proxy
void proxy_control_message(std::vector<zmq::message_t>& parts);
void proxy_control_message(control_message_array& parts, size_t len);
/// Closing any idle connections that have outlived their idle time. Note that this only
/// affects outgoing connections; incomings connections are the responsibility of the other end.

View File

@ -271,14 +271,14 @@ void OxenMQ::proxy_reply(oxenc::bt_dict_consumer data) {
}
}
void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
void OxenMQ::proxy_control_message(OxenMQ::control_message_array& parts, size_t len) {
// We throw an uncaught exception here because we only generate control messages internally in
// oxenmq code: if one of these condition fail it's a oxenmq bug.
if (parts.size() < 2)
if (len < 2)
throw std::logic_error("OxenMQ bug: Expected 2-3 message parts for a proxy control message");
auto route = view(parts[0]), cmd = view(parts[1]);
OMQ_TRACE("control message: ", cmd);
if (parts.size() == 3) {
if (len == 3) {
OMQ_TRACE("...: ", parts[2]);
auto data = view(parts[2]);
if (cmd == "SEND") {
@ -315,7 +315,7 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
bind.push_back(std::move(b));
return;
}
} else if (parts.size() == 2) {
} else if (len == 2) {
if (cmd == "START") {
// Command send by the owning thread during startup; we send back a simple READY reply to
// let it know we are running.
@ -335,7 +335,7 @@ void OxenMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
}
}
throw std::runtime_error("OxenMQ bug: Proxy received invalid control command: " +
std::string{cmd} + " (" + std::to_string(parts.size()) + ")");
std::string{cmd} + " (" + std::to_string(len) + ")");
}
bool OxenMQ::proxy_bind(bind_data& b, size_t bind_index) {
@ -497,6 +497,10 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
}
startup.set_value();
// Fixed array used for worker and control messages: these are never longer than 3 parts:
std::array<zmq::message_t, 3> control_parts;
// General vector for handling incoming messages:
std::vector<zmq::message_t> parts;
while (true) {
@ -529,13 +533,13 @@ void OxenMQ::proxy_loop(std::promise<void> startup) {
OMQ_TRACE("processing control messages");
// Retrieve any waiting incoming control messages
for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) {
proxy_control_message(parts);
while (size_t len = recv_message_parts(command, control_parts, zmq::recv_flags::dontwait)) {
proxy_control_message(control_parts, len);
}
OMQ_TRACE("processing worker messages");
for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) {
proxy_worker_message(parts);
while (size_t len = recv_message_parts(workers_socket, control_parts, zmq::recv_flags::dontwait)) {
proxy_worker_message(control_parts, len);
}
OMQ_TRACE("processing timers");

View File

@ -177,10 +177,10 @@ OxenMQ::run_info& OxenMQ::get_idle_worker() {
return workers[id];
}
void OxenMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
void OxenMQ::proxy_worker_message(OxenMQ::control_message_array& parts, size_t len) {
// Process messages sent by workers
if (parts.size() != 2) {
OMQ_LOG(error, "Received send invalid ", parts.size(), "-part message");
if (len != 2) {
OMQ_LOG(error, "Received send invalid ", len, "-part message");
return;
}
auto route = view(parts[0]), cmd = view(parts[1]);