mirror of https://github.com/oxen-io/oxen-mq.git
Various small optimizations
This commit is contained in:
parent
f75b6cf221
commit
03ea49167c
|
@ -64,16 +64,13 @@ void route_control(zmq::socket_t& sock, string_view identity, string_view cmd, c
|
||||||
|
|
||||||
// Receive all the parts of a single message from the given socket. Returns true if a message was
|
// Receive all the parts of a single message from the given socket. Returns true if a message was
|
||||||
// received, false if called with flags=zmq::recv_flags::dontwait and no message was available.
|
// received, false if called with flags=zmq::recv_flags::dontwait and no message was available.
|
||||||
template <typename OutputIt>
|
bool recv_message_parts(zmq::socket_t &sock, std::vector<zmq::message_t>& parts, const zmq::recv_flags flags = zmq::recv_flags::none) {
|
||||||
bool recv_message_parts(zmq::socket_t &sock, OutputIt it, const zmq::recv_flags flags = zmq::recv_flags::none) {
|
do {
|
||||||
bool more = true;
|
|
||||||
while (more) {
|
|
||||||
zmq::message_t msg;
|
zmq::message_t msg;
|
||||||
if (!sock.recv(msg, flags))
|
if (!sock.recv(msg, flags))
|
||||||
return false;
|
return false;
|
||||||
more = msg.more();
|
parts.push_back(std::move(msg));
|
||||||
*it = std::move(msg);
|
} while (parts.back().more());
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,8 +91,12 @@ void send_message_parts(zmq::socket_t &sock, Container &&c) {
|
||||||
send_message_parts(sock, c.begin(), c.end());
|
send_message_parts(sock, c.begin(), c.end());
|
||||||
}
|
}
|
||||||
|
|
||||||
void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg, std::string data = {}) {
|
/// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then
|
||||||
std::array<zmq::message_t, 3> msgs{{create_message(std::move(route)), create_message(std::move(msg))}};
|
/// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted.
|
||||||
|
void send_routed_message(zmq::socket_t &socket, std::string route, std::string msg = {}, std::string data = {}) {
|
||||||
|
std::array<zmq::message_t, 3> msgs{{create_message(std::move(route))}};
|
||||||
|
if (!msg.empty())
|
||||||
|
msgs[1] = create_message(std::move(msg));
|
||||||
if (!data.empty())
|
if (!data.empty())
|
||||||
msgs[2] = create_message(std::move(data));
|
msgs[2] = create_message(std::move(data));
|
||||||
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end());
|
send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end());
|
||||||
|
@ -391,7 +392,7 @@ void LokiMQ::start() {
|
||||||
|
|
||||||
zmq::message_t ready_msg;
|
zmq::message_t ready_msg;
|
||||||
std::vector<zmq::message_t> parts;
|
std::vector<zmq::message_t> parts;
|
||||||
try { recv_message_parts(control, std::back_inserter(parts)); }
|
try { recv_message_parts(control, parts); }
|
||||||
catch (const zmq::error_t &e) { throw std::runtime_error("Failure reading from LokiMQ::Proxy thread: "s + e.what()); }
|
catch (const zmq::error_t &e) { throw std::runtime_error("Failure reading from LokiMQ::Proxy thread: "s + e.what()); }
|
||||||
|
|
||||||
if (!(parts.size() == 1 && view(parts.front()) == "READY"))
|
if (!(parts.size() == 1 && view(parts.front()) == "READY"))
|
||||||
|
@ -495,7 +496,7 @@ void LokiMQ::worker_thread(unsigned int index) {
|
||||||
detail::send_control(sock, "RAN");
|
detail::send_control(sock, "RAN");
|
||||||
LMQ_LOG(trace, "worker ", worker_id, " waiting for requests");
|
LMQ_LOG(trace, "worker ", worker_id, " waiting for requests");
|
||||||
parts.clear();
|
parts.clear();
|
||||||
recv_message_parts(sock, std::back_inserter(parts));
|
recv_message_parts(sock, parts);
|
||||||
|
|
||||||
if (parts.size() != 1) {
|
if (parts.size() != 1) {
|
||||||
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part worker instruction");
|
LMQ_LOG(error, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part worker instruction");
|
||||||
|
@ -696,39 +697,44 @@ void LokiMQ::proxy_batch(detail::Batch* batchptr) {
|
||||||
batch_jobs.emplace(batch, i);
|
batch_jobs.emplace(batch, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
void LokiMQ::proxy_control_message(std::vector<zmq::message_t> parts) {
|
void LokiMQ::proxy_control_message(std::vector<zmq::message_t>& parts) {
|
||||||
if (parts.size() < 2 || parts.size() > 3)
|
if (parts.size() < 2)
|
||||||
throw std::logic_error("Expected 2-3 message parts for a proxy control message");
|
throw std::logic_error("Expected 2-3 message parts for a proxy control message");
|
||||||
auto route = view(parts[0]), cmd = view(parts[1]);
|
auto route = view(parts[0]), cmd = view(parts[1]);
|
||||||
LMQ_LOG(trace, "control message: ", cmd);
|
LMQ_LOG(trace, "control message: ", cmd);
|
||||||
if (cmd == "SEND") {
|
if (parts.size() == 3) {
|
||||||
LMQ_LOG(trace, "proxying message");
|
if (cmd == "SEND") {
|
||||||
proxy_send(bt_deserialize<bt_dict>(view(parts.at(2))));
|
LMQ_LOG(trace, "proxying message");
|
||||||
} else if (cmd == "REPLY") {
|
return proxy_send(bt_deserialize<bt_dict>(view(parts[2])));
|
||||||
LMQ_LOG(trace, "proxying reply to non-SN incoming message");
|
} else if (cmd == "REPLY") {
|
||||||
proxy_reply(bt_deserialize<bt_dict>(view(parts.at(2))));
|
LMQ_LOG(trace, "proxying reply to non-SN incoming message");
|
||||||
} else if (cmd == "BATCH") {
|
return proxy_reply(bt_deserialize<bt_dict>(view(parts[2])));
|
||||||
LMQ_LOG(trace, "proxy batch jobs");
|
} else if (cmd == "BATCH") {
|
||||||
auto ptrval = bt_deserialize<uintptr_t>(view(parts.at(2)));
|
LMQ_LOG(trace, "proxy batch jobs");
|
||||||
proxy_batch(reinterpret_cast<detail::Batch*>(ptrval));
|
auto ptrval = bt_deserialize<uintptr_t>(view(parts[2]));
|
||||||
} else if (cmd == "CONNECT") {
|
return proxy_batch(reinterpret_cast<detail::Batch*>(ptrval));
|
||||||
proxy_connect(bt_deserialize<bt_dict>(view(parts.at(2))));
|
} else if (cmd == "CONNECT") {
|
||||||
} else if (cmd == "START") {
|
proxy_connect(bt_deserialize<bt_dict>(view(parts[2])));
|
||||||
// Command send by the owning thread during startup; we send back a simple READY reply to
|
return;
|
||||||
// let it know we are running.
|
}
|
||||||
route_control(command, route, "READY");
|
} else if (parts.size() == 2) {
|
||||||
} else if (cmd == "QUIT") {
|
if (cmd == "START") {
|
||||||
// Asked to quit: set max_workers to zero and tell any idle ones to quit. We will
|
// Command send by the owning thread during startup; we send back a simple READY reply to
|
||||||
// close workers as they come back to READY status, and then close external
|
// let it know we are running.
|
||||||
// connections once all workers are done.
|
return route_control(command, route, "READY");
|
||||||
max_workers = 0;
|
} else if (cmd == "QUIT") {
|
||||||
for (const auto &route : idle_workers)
|
// Asked to quit: set max_workers to zero and tell any idle ones to quit. We will
|
||||||
route_control(workers_socket, workers[route].routing_id, "QUIT");
|
// close workers as they come back to READY status, and then close external
|
||||||
idle_workers.clear();
|
// connections once all workers are done.
|
||||||
} else {
|
max_workers = 0;
|
||||||
throw std::runtime_error("Proxy received invalid control command: " + std::string{cmd} +
|
for (const auto &route : idle_workers)
|
||||||
" (" + std::to_string(parts.size()) + ")");
|
route_control(workers_socket, workers[route].routing_id, "QUIT");
|
||||||
|
idle_workers.clear();
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
throw std::runtime_error("Proxy received invalid control command: " + std::string{cmd} +
|
||||||
|
" (" + std::to_string(parts.size()) + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
auto LokiMQ::proxy_close_outgoing(decltype(peers)::iterator it) -> decltype(it) {
|
auto LokiMQ::proxy_close_outgoing(decltype(peers)::iterator it) -> decltype(it) {
|
||||||
|
@ -861,12 +867,12 @@ void LokiMQ::proxy_loop() {
|
||||||
|
|
||||||
LMQ_LOG(trace, "processing control messages");
|
LMQ_LOG(trace, "processing control messages");
|
||||||
// Retrieve any waiting incoming control messages
|
// Retrieve any waiting incoming control messages
|
||||||
for (parts.clear(); recv_message_parts(command, std::back_inserter(parts), zmq::recv_flags::dontwait); parts.clear()) {
|
for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) {
|
||||||
proxy_control_message(std::move(parts));
|
proxy_control_message(parts);
|
||||||
}
|
}
|
||||||
|
|
||||||
LMQ_LOG(trace, "processing worker messages");
|
LMQ_LOG(trace, "processing worker messages");
|
||||||
for (parts.clear(); recv_message_parts(workers_socket, std::back_inserter(parts), zmq::recv_flags::dontwait); parts.clear()) {
|
for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) {
|
||||||
proxy_worker_message(parts);
|
proxy_worker_message(parts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -894,7 +900,7 @@ void LokiMQ::proxy_loop() {
|
||||||
queue_index.pop();
|
queue_index.pop();
|
||||||
auto &sock = listener.connected() ? (i == 0 ? listener : remotes[i - 1].second) : remotes[i].second;
|
auto &sock = listener.connected() ? (i == 0 ? listener : remotes[i - 1].second) : remotes[i].second;
|
||||||
|
|
||||||
if (!recv_message_parts(sock, std::back_inserter(parts), zmq::recv_flags::dontwait))
|
if (!recv_message_parts(sock, parts, zmq::recv_flags::dontwait))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
// We only pull this one message now but then requeue the socket so that after we check
|
// We only pull this one message now but then requeue the socket so that after we check
|
||||||
|
@ -1238,8 +1244,7 @@ bool LokiMQ::proxy_check_auth(string_view pubkey, size_t conn_index, const peer_
|
||||||
}
|
}
|
||||||
|
|
||||||
void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) {
|
void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) {
|
||||||
std::vector<zmq::message_t> frames;
|
for (std::vector<zmq::message_t> frames; recv_message_parts(zap_auth, frames, zmq::recv_flags::dontwait); frames.clear()) {
|
||||||
for (frames.reserve(7); recv_message_parts(zap_auth, std::back_inserter(frames), zmq::recv_flags::dontwait); frames.clear()) {
|
|
||||||
if (log_level() >= LogLevel::trace) {
|
if (log_level() >= LogLevel::trace) {
|
||||||
std::ostringstream o;
|
std::ostringstream o;
|
||||||
o << "Processing ZAP authentication request:";
|
o << "Processing ZAP authentication request:";
|
||||||
|
|
|
@ -407,7 +407,7 @@ private:
|
||||||
void process_zap_requests(zmq::socket_t& zap_auth);
|
void process_zap_requests(zmq::socket_t& zap_auth);
|
||||||
|
|
||||||
/// Handles a control message from some outer thread to the proxy
|
/// Handles a control message from some outer thread to the proxy
|
||||||
void proxy_control_message(std::vector<zmq::message_t> parts);
|
void proxy_control_message(std::vector<zmq::message_t>& parts);
|
||||||
|
|
||||||
/// Closing any idle connections that have outlived their idle time. Note that this only
|
/// Closing any idle connections that have outlived their idle time. Note that this only
|
||||||
/// affects outgoing connections; incomings connections are the responsibility of the other end.
|
/// affects outgoing connections; incomings connections are the responsibility of the other end.
|
||||||
|
|
Loading…
Reference in New Issue