From 03ea49167cb145d487456c03dc1fc3098fb14a80 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Thu, 6 Feb 2020 00:50:31 -0400 Subject: [PATCH] Various small optimizations --- lokimq/lokimq.cpp | 97 +++++++++++++++++++++++++---------------------- lokimq/lokimq.h | 2 +- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 708ecf2..4ea80c1 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -64,16 +64,13 @@ void route_control(zmq::socket_t& sock, string_view identity, string_view cmd, c // Receive all the parts of a single message from the given socket. Returns true if a message was // received, false if called with flags=zmq::recv_flags::dontwait and no message was available. -template -bool recv_message_parts(zmq::socket_t &sock, OutputIt it, const zmq::recv_flags flags = zmq::recv_flags::none) { - bool more = true; - while (more) { +bool recv_message_parts(zmq::socket_t &sock, std::vector& parts, const zmq::recv_flags flags = zmq::recv_flags::none) { + do { zmq::message_t msg; if (!sock.recv(msg, flags)) return false; - more = msg.more(); - *it = std::move(msg); - } + parts.push_back(std::move(msg)); + } while (parts.back().more()); return true; } @@ -94,8 +91,12 @@ void send_message_parts(zmq::socket_t &sock, Container &&c) { send_message_parts(sock, c.begin(), c.end()); } -void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg, std::string data = {}) { - std::array msgs{{create_message(std::move(route)), create_message(std::move(msg))}}; +/// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then +/// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted. +void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {}) { + std::array msgs{{create_message(std::move(route))}}; + if (!msg.empty()) + msgs[1] = create_message(std::move(msg)); if (!data.empty()) msgs[2] = create_message(std::move(data)); send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end()); @@ -391,7 +392,7 @@ void LokiMQ::start() { zmq::message_t ready_msg; std::vector parts; - try { recv_message_parts(control, std::back_inserter(parts)); } + try { recv_message_parts(control, parts); } catch (const zmq::error_t &e) { throw std::runtime_error("Failure reading from LokiMQ::Proxy thread: "s + e.what()); } if (!(parts.size() == 1 && view(parts.front()) == "READY")) @@ -495,7 +496,7 @@ void LokiMQ::worker_thread(unsigned int index) { detail::send_control(sock, "RAN"); LMQ_LOG(trace, "worker ", worker_id, " waiting for requests"); parts.clear(); - recv_message_parts(sock, std::back_inserter(parts)); + recv_message_parts(sock, parts); if (parts.size() != 1) { LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part worker instruction"); @@ -696,39 +697,44 @@ void LokiMQ::proxy_batch(detail::Batch* batchptr) { batch_jobs.emplace(batch, i); } -void LokiMQ::proxy_control_message(std::vector parts) { - if (parts.size() < 2 || parts.size() > 3) +void LokiMQ::proxy_control_message(std::vector& parts) { + if (parts.size() < 2) throw std::logic_error("Expected 2-3 message parts for a proxy control message"); auto route = view(parts[0]), cmd = view(parts[1]); LMQ_LOG(trace, "control message: ", cmd); - if (cmd == "SEND") { - LMQ_LOG(trace, "proxying message"); - proxy_send(bt_deserialize(view(parts.at(2)))); - } else if (cmd == "REPLY") { - LMQ_LOG(trace, "proxying reply to non-SN incoming message"); - proxy_reply(bt_deserialize(view(parts.at(2)))); - } else if (cmd == "BATCH") { - LMQ_LOG(trace, "proxy batch jobs"); - auto ptrval = bt_deserialize(view(parts.at(2))); - proxy_batch(reinterpret_cast(ptrval)); - } else if (cmd == "CONNECT") { - proxy_connect(bt_deserialize(view(parts.at(2)))); - } else if (cmd == "START") { - // Command send by the owning thread during startup; we send back a simple READY reply to - // let it know we are running. - route_control(command, route, "READY"); - } else if (cmd == "QUIT") { - // Asked to quit: set max_workers to zero and tell any idle ones to quit. We will - // close workers as they come back to READY status, and then close external - // connections once all workers are done. - max_workers = 0; - for (const auto &route : idle_workers) - route_control(workers_socket, workers[route].routing_id, "QUIT"); - idle_workers.clear(); - } else { - throw std::runtime_error("Proxy received invalid control command: " + std::string{cmd} + - " (" + std::to_string(parts.size()) + ")"); + if (parts.size() == 3) { + if (cmd == "SEND") { + LMQ_LOG(trace, "proxying message"); + return proxy_send(bt_deserialize(view(parts[2]))); + } else if (cmd == "REPLY") { + LMQ_LOG(trace, "proxying reply to non-SN incoming message"); + return proxy_reply(bt_deserialize(view(parts[2]))); + } else if (cmd == "BATCH") { + LMQ_LOG(trace, "proxy batch jobs"); + auto ptrval = bt_deserialize(view(parts[2])); + return proxy_batch(reinterpret_cast(ptrval)); + } else if (cmd == "CONNECT") { + proxy_connect(bt_deserialize(view(parts[2]))); + return; + } + } else if (parts.size() == 2) { + if (cmd == "START") { + // Command send by the owning thread during startup; we send back a simple READY reply to + // let it know we are running. + return route_control(command, route, "READY"); + } else if (cmd == "QUIT") { + // Asked to quit: set max_workers to zero and tell any idle ones to quit. We will + // close workers as they come back to READY status, and then close external + // connections once all workers are done. + max_workers = 0; + for (const auto &route : idle_workers) + route_control(workers_socket, workers[route].routing_id, "QUIT"); + idle_workers.clear(); + return; + } } + throw std::runtime_error("Proxy received invalid control command: " + std::string{cmd} + + " (" + std::to_string(parts.size()) + ")"); } auto LokiMQ::proxy_close_outgoing(decltype(peers)::iterator it) -> decltype(it) { @@ -861,12 +867,12 @@ void LokiMQ::proxy_loop() { LMQ_LOG(trace, "processing control messages"); // Retrieve any waiting incoming control messages - for (parts.clear(); recv_message_parts(command, std::back_inserter(parts), zmq::recv_flags::dontwait); parts.clear()) { - proxy_control_message(std::move(parts)); + for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) { + proxy_control_message(parts); } LMQ_LOG(trace, "processing worker messages"); - for (parts.clear(); recv_message_parts(workers_socket, std::back_inserter(parts), zmq::recv_flags::dontwait); parts.clear()) { + for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) { proxy_worker_message(parts); } @@ -894,7 +900,7 @@ void LokiMQ::proxy_loop() { queue_index.pop(); auto &sock = listener.connected() ? (i == 0 ? listener : remotes[i - 1].second) : remotes[i].second; - if (!recv_message_parts(sock, std::back_inserter(parts), zmq::recv_flags::dontwait)) + if (!recv_message_parts(sock, parts, zmq::recv_flags::dontwait)) continue; // We only pull this one message now but then requeue the socket so that after we check @@ -1238,8 +1244,7 @@ bool LokiMQ::proxy_check_auth(string_view pubkey, size_t conn_index, const peer_ } void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { - std::vector frames; - for (frames.reserve(7); recv_message_parts(zap_auth, std::back_inserter(frames), zmq::recv_flags::dontwait); frames.clear()) { + for (std::vector frames; recv_message_parts(zap_auth, frames, zmq::recv_flags::dontwait); frames.clear()) { if (log_level() >= LogLevel::trace) { std::ostringstream o; o << "Processing ZAP authentication request:"; diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 6a9aee8..d3e3e1a 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -407,7 +407,7 @@ private: void process_zap_requests(zmq::socket_t& zap_auth); /// Handles a control message from some outer thread to the proxy - void proxy_control_message(std::vector parts); + void proxy_control_message(std::vector& parts); /// Closing any idle connections that have outlived their idle time. Note that this only /// affects outgoing connections; incomings connections are the responsibility of the other end.