2020-03-13 18:02:54 +01:00
|
|
|
#include "lokimq.h"
|
|
|
|
#include "batch.h"
|
|
|
|
#include "lokimq-internal.h"
|
|
|
|
|
2020-07-06 02:21:06 +02:00
|
|
|
#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
|
|
|
|
extern "C" {
|
|
|
|
#include <pthread.h>
|
|
|
|
#include <pthread_np.h>
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
2020-03-13 18:02:54 +01:00
|
|
|
namespace lokimq {
|
|
|
|
|
2020-06-05 19:06:34 +02:00
|
|
|
namespace {
|
|
|
|
|
|
|
|
// Waits for a specific command or "QUIT" on the given socket. Returns true if the command was
|
|
|
|
// received. If "QUIT" was received, replies with "QUITTING" on the socket and closes it, then
|
|
|
|
// returns false.
|
|
|
|
[[gnu::always_inline]] inline
|
|
|
|
bool worker_wait_for(LokiMQ& lmq, zmq::socket_t& sock, std::vector<zmq::message_t>& parts, const std::string_view worker_id, const std::string_view expect) {
|
|
|
|
while (true) {
|
|
|
|
lmq.log(LogLevel::debug, __FILE__, __LINE__, "worker ", worker_id, " waiting for ", expect);
|
|
|
|
parts.clear();
|
|
|
|
recv_message_parts(sock, parts);
|
|
|
|
if (parts.size() != 1) {
|
|
|
|
lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid ", parts.size(), "-part control msg");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
auto command = view(parts[0]);
|
|
|
|
if (command == expect) {
|
|
|
|
#ifndef NDEBUG
|
|
|
|
lmq.log(LogLevel::trace, __FILE__, __LINE__, "Worker ", worker_id, " received waited-for ", expect, " command");
|
|
|
|
#endif
|
|
|
|
return true;
|
|
|
|
} else if (command == "QUIT"sv) {
|
|
|
|
lmq.log(LogLevel::debug, __FILE__, __LINE__, "Worker ", worker_id, " received QUIT command, shutting down");
|
|
|
|
detail::send_control(sock, "QUITTING");
|
|
|
|
sock.setsockopt<int>(ZMQ_LINGER, 1000);
|
|
|
|
sock.close();
|
|
|
|
return false;
|
|
|
|
} else {
|
|
|
|
lmq.log(LogLevel::error, __FILE__, __LINE__, "Internal error: worker ", worker_id, " received invalid command: `", command, "'");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-06-05 04:32:28 +02:00
|
|
|
void LokiMQ::worker_thread(unsigned int index, std::optional<std::string> tagged, std::function<void()> start) {
|
|
|
|
std::string routing_id = (tagged ? "t" : "w") + std::to_string(index); // for routing
|
|
|
|
std::string_view worker_id{tagged ? *tagged : routing_id}; // for debug
|
2020-04-21 16:57:38 +02:00
|
|
|
|
2020-06-05 04:32:28 +02:00
|
|
|
[[maybe_unused]] std::string thread_name = tagged.value_or("lmq-" + routing_id);
|
2020-04-21 16:57:38 +02:00
|
|
|
#if defined(__linux__) || defined(__sun) || defined(__MINGW32__)
|
2020-06-05 04:32:28 +02:00
|
|
|
if (thread_name.size() > 15) thread_name.resize(15);
|
|
|
|
pthread_setname_np(pthread_self(), thread_name.c_str());
|
2020-04-21 16:57:38 +02:00
|
|
|
#elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
|
2020-06-05 04:32:28 +02:00
|
|
|
pthread_set_name_np(pthread_self(), thread_name.c_str());
|
2020-04-21 16:57:38 +02:00
|
|
|
#elif defined(__MACH__)
|
2020-06-05 04:32:28 +02:00
|
|
|
pthread_setname_np(thread_name.c_str());
|
2020-04-21 16:57:38 +02:00
|
|
|
#endif
|
|
|
|
|
2020-03-13 18:02:54 +01:00
|
|
|
zmq::socket_t sock{context, zmq::socket_type::dealer};
|
2020-06-05 04:32:28 +02:00
|
|
|
sock.setsockopt(ZMQ_ROUTING_ID, routing_id.data(), routing_id.size());
|
|
|
|
LMQ_LOG(debug, "New worker thread ", worker_id, " (", routing_id, ") started");
|
2020-03-13 18:02:54 +01:00
|
|
|
sock.connect(SN_ADDR_WORKERS);
|
2020-06-05 19:06:34 +02:00
|
|
|
if (tagged)
|
|
|
|
detail::send_control(sock, "STARTING");
|
2020-03-13 18:02:54 +01:00
|
|
|
|
2020-04-24 23:58:24 +02:00
|
|
|
Message message{*this, 0, AuthLevel::none, ""s};
|
2020-03-13 18:02:54 +01:00
|
|
|
std::vector<zmq::message_t> parts;
|
2020-06-05 04:32:28 +02:00
|
|
|
|
|
|
|
bool waiting_for_command;
|
|
|
|
if (tagged) {
|
|
|
|
// If we're a tagged worker then we got started up before LokiMQ started, so we need to wait
|
|
|
|
// for an all-clear signal from LokiMQ first, then we fire our `start` callback, then we can
|
|
|
|
// start waiting for commands in the main loop further down. (We also can't get the
|
|
|
|
// reference to our `tagged_workers` element until the main proxy threads is running).
|
|
|
|
|
|
|
|
waiting_for_command = true;
|
|
|
|
|
2020-06-05 19:06:34 +02:00
|
|
|
if (!worker_wait_for(*this, sock, parts, worker_id, "START"sv))
|
|
|
|
return;
|
|
|
|
if (start) start();
|
2020-06-05 04:32:28 +02:00
|
|
|
} else {
|
|
|
|
// Otherwise for a regular worker we can only be started by an active main proxy thread
|
|
|
|
// which will have preloaded our first job so we can start off right away.
|
|
|
|
waiting_for_command = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// This will always contains the current job, and is guaranteed to never be invalidated.
|
|
|
|
run_info& run = tagged ? std::get<run_info>(tagged_workers[index - 1]) : workers[index];
|
2020-03-13 18:02:54 +01:00
|
|
|
|
|
|
|
while (true) {
|
2020-06-05 19:06:34 +02:00
|
|
|
if (waiting_for_command) {
|
|
|
|
if (!worker_wait_for(*this, sock, parts, worker_id, "RUN"sv))
|
2020-06-05 04:32:28 +02:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-03-13 18:02:54 +01:00
|
|
|
try {
|
|
|
|
if (run.is_batch_job) {
|
2020-06-08 02:30:46 +02:00
|
|
|
auto* batch = std::get<detail::Batch*>(run.to_run);
|
2020-03-13 18:02:54 +01:00
|
|
|
if (run.batch_jobno >= 0) {
|
2020-06-08 02:30:46 +02:00
|
|
|
LMQ_TRACE("worker thread ", worker_id, " running batch ", batch, "#", run.batch_jobno);
|
|
|
|
batch->run_job(run.batch_jobno);
|
2020-03-13 18:02:54 +01:00
|
|
|
} else if (run.batch_jobno == -1) {
|
2020-06-08 02:30:46 +02:00
|
|
|
LMQ_TRACE("worker thread ", worker_id, " running batch ", batch, " completion");
|
|
|
|
batch->job_completion();
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
2020-06-08 02:30:46 +02:00
|
|
|
} else if (run.is_injected) {
|
|
|
|
auto& func = std::get<std::function<void()>>(run.to_run);
|
|
|
|
LMQ_TRACE("worker thread ", worker_id, " invoking injected command ", run.command);
|
|
|
|
func();
|
|
|
|
func = nullptr;
|
2020-03-13 18:02:54 +01:00
|
|
|
} else {
|
|
|
|
message.conn = run.conn;
|
2020-04-24 02:52:39 +02:00
|
|
|
message.access = run.access;
|
2020-04-24 23:58:24 +02:00
|
|
|
message.remote = std::move(run.remote);
|
2020-03-13 18:02:54 +01:00
|
|
|
message.data.clear();
|
|
|
|
|
2020-04-24 23:58:24 +02:00
|
|
|
LMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)");
|
2020-03-13 18:02:54 +01:00
|
|
|
|
2020-06-08 02:30:46 +02:00
|
|
|
auto& [callback, is_request] = *std::get<const std::pair<CommandCallback, bool>*>(run.to_run);
|
|
|
|
if (is_request) {
|
2020-03-13 18:02:54 +01:00
|
|
|
message.reply_tag = {run.data_parts[0].data<char>(), run.data_parts[0].size()};
|
|
|
|
for (auto it = run.data_parts.begin() + 1; it != run.data_parts.end(); ++it)
|
|
|
|
message.data.emplace_back(it->data<char>(), it->size());
|
|
|
|
} else {
|
|
|
|
for (auto& m : run.data_parts)
|
|
|
|
message.data.emplace_back(m.data<char>(), m.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
LMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts");
|
2020-06-08 02:30:46 +02:00
|
|
|
callback(message);
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (const bt_deserialize_invalid& e) {
|
|
|
|
LMQ_LOG(warn, worker_id, " deserialization failed: ", e.what(), "; ignoring request");
|
|
|
|
}
|
2020-05-15 01:08:34 +02:00
|
|
|
catch (const std::bad_variant_access& e) {
|
2020-03-13 18:02:54 +01:00
|
|
|
LMQ_LOG(warn, worker_id, " deserialization failed: found unexpected serialized type (", e.what(), "); ignoring request");
|
|
|
|
}
|
|
|
|
catch (const std::out_of_range& e) {
|
|
|
|
LMQ_LOG(warn, worker_id, " deserialization failed: invalid data - required field missing (", e.what(), "); ignoring request");
|
|
|
|
}
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
LMQ_LOG(warn, worker_id, " caught exception when processing command: ", e.what());
|
|
|
|
}
|
|
|
|
catch (...) {
|
|
|
|
LMQ_LOG(warn, worker_id, " caught non-standard exception when processing command");
|
|
|
|
}
|
|
|
|
|
2020-06-05 04:32:28 +02:00
|
|
|
// Tell the proxy thread that we are ready for another job
|
|
|
|
detail::send_control(sock, "RAN");
|
|
|
|
waiting_for_command = true;
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
LokiMQ::run_info& LokiMQ::get_idle_worker() {
|
|
|
|
if (idle_workers.empty()) {
|
|
|
|
size_t id = workers.size();
|
|
|
|
assert(workers.capacity() > id);
|
|
|
|
workers.emplace_back();
|
|
|
|
auto& r = workers.back();
|
|
|
|
r.worker_id = id;
|
|
|
|
r.worker_routing_id = "w" + std::to_string(id);
|
|
|
|
return r;
|
|
|
|
}
|
|
|
|
size_t id = idle_workers.back();
|
|
|
|
idle_workers.pop_back();
|
|
|
|
return workers[id];
|
|
|
|
}
|
|
|
|
|
|
|
|
void LokiMQ::proxy_worker_message(std::vector<zmq::message_t>& parts) {
|
|
|
|
// Process messages sent by workers
|
|
|
|
if (parts.size() != 2) {
|
|
|
|
LMQ_LOG(error, "Received send invalid ", parts.size(), "-part message");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
auto route = view(parts[0]), cmd = view(parts[1]);
|
|
|
|
LMQ_TRACE("worker message from ", route);
|
2020-06-05 04:32:28 +02:00
|
|
|
assert(route.size() >= 2 && (route[0] == 'w' || route[0] == 't') && route[1] >= '0' && route[1] <= '9');
|
|
|
|
bool tagged_worker = route[0] == 't';
|
|
|
|
std::string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" (or "t")
|
2020-03-13 18:02:54 +01:00
|
|
|
unsigned int worker_id = detail::extract_unsigned(worker_id_str);
|
2020-06-05 04:32:28 +02:00
|
|
|
if (!worker_id_str.empty() /* didn't consume everything */ ||
|
|
|
|
(tagged_worker
|
|
|
|
? 0 == worker_id || worker_id > tagged_workers.size() // tagged worker ids are indexed from 1 to N (0 means untagged)
|
|
|
|
: worker_id >= workers.size() // regular worker ids are indexed from 0 to N-1
|
|
|
|
)
|
|
|
|
) {
|
2020-03-13 18:02:54 +01:00
|
|
|
LMQ_LOG(error, "Worker id '", route, "' is invalid, unable to process worker command");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-06-05 04:32:28 +02:00
|
|
|
auto& run = tagged_worker ? std::get<run_info>(tagged_workers[worker_id - 1]) : workers[worker_id];
|
2020-03-13 18:02:54 +01:00
|
|
|
|
|
|
|
LMQ_TRACE("received ", cmd, " command from ", route);
|
2020-06-05 04:32:28 +02:00
|
|
|
if (cmd == "RAN"sv) {
|
|
|
|
LMQ_TRACE("Worker ", route, " finished ", run.is_batch_job ? "batch job" : run.command);
|
2020-03-13 18:02:54 +01:00
|
|
|
if (run.is_batch_job) {
|
2020-06-05 04:32:28 +02:00
|
|
|
if (tagged_worker) {
|
|
|
|
std::get<bool>(tagged_workers[worker_id - 1]) = false;
|
|
|
|
} else {
|
|
|
|
auto& active = run.is_reply_job ? reply_jobs_active : batch_jobs_active;
|
|
|
|
assert(active > 0);
|
|
|
|
active--;
|
|
|
|
}
|
2020-03-13 18:02:54 +01:00
|
|
|
bool clear_job = false;
|
2020-06-08 02:30:46 +02:00
|
|
|
auto* batch = std::get<detail::Batch*>(run.to_run);
|
2020-03-13 18:02:54 +01:00
|
|
|
if (run.batch_jobno == -1) {
|
|
|
|
// Returned from the completion function
|
|
|
|
clear_job = true;
|
|
|
|
} else {
|
2020-06-08 02:30:46 +02:00
|
|
|
auto [state, thread] = batch->job_finished();
|
2020-06-05 04:32:28 +02:00
|
|
|
if (state == detail::BatchState::complete) {
|
|
|
|
if (thread == -1) { // run directly in proxy
|
|
|
|
LMQ_TRACE("Completion job running directly in proxy");
|
|
|
|
try {
|
2020-06-08 02:30:46 +02:00
|
|
|
batch->job_completion(); // RUN DIRECTLY IN PROXY THREAD
|
2020-06-05 04:32:28 +02:00
|
|
|
} catch (const std::exception &e) {
|
|
|
|
// Raise these to error levels: the caller really shouldn't be doing
|
|
|
|
// anything non-trivial in an in-proxy completion function!
|
|
|
|
LMQ_LOG(error, "proxy thread caught exception when processing in-proxy completion command: ", e.what());
|
|
|
|
} catch (...) {
|
|
|
|
LMQ_LOG(error, "proxy thread caught non-standard exception when processing in-proxy completion command");
|
|
|
|
}
|
|
|
|
clear_job = true;
|
|
|
|
} else {
|
|
|
|
auto& jobs =
|
|
|
|
thread > 0
|
|
|
|
? std::get<std::queue<batch_job>>(tagged_workers[thread - 1]) // run in tagged thread
|
|
|
|
: run.is_reply_job
|
|
|
|
? reply_jobs
|
|
|
|
: batch_jobs;
|
2020-06-08 02:30:46 +02:00
|
|
|
jobs.emplace(batch, -1);
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
2020-06-05 04:32:28 +02:00
|
|
|
} else if (state == detail::BatchState::done) {
|
|
|
|
// No completion job
|
2020-03-13 18:02:54 +01:00
|
|
|
clear_job = true;
|
|
|
|
}
|
2020-06-05 04:32:28 +02:00
|
|
|
// else the job is still running
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if (clear_job) {
|
2020-06-08 02:30:46 +02:00
|
|
|
batches.erase(batch);
|
|
|
|
delete batch;
|
|
|
|
run.to_run = static_cast<detail::Batch*>(nullptr);
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
assert(run.cat->active_threads > 0);
|
|
|
|
run.cat->active_threads--;
|
|
|
|
}
|
|
|
|
if (max_workers == 0) { // Shutting down
|
|
|
|
LMQ_TRACE("Telling worker ", route, " to quit");
|
|
|
|
route_control(workers_socket, route, "QUIT");
|
2020-06-05 04:32:28 +02:00
|
|
|
} else if (!tagged_worker) {
|
2020-03-13 18:02:54 +01:00
|
|
|
idle_workers.push_back(worker_id);
|
|
|
|
}
|
2020-06-05 04:32:28 +02:00
|
|
|
} else if (cmd == "QUITTING"sv) {
|
|
|
|
run.worker_thread.join();
|
2020-03-13 18:02:54 +01:00
|
|
|
LMQ_LOG(debug, "Worker ", route, " exited normally");
|
|
|
|
} else {
|
|
|
|
LMQ_LOG(error, "Worker ", route, " sent unknown control message: `", cmd, "'");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void LokiMQ::proxy_run_worker(run_info& run) {
|
|
|
|
if (!run.worker_thread.joinable())
|
2020-06-05 04:32:28 +02:00
|
|
|
run.worker_thread = std::thread{[this, id=run.worker_id] { worker_thread(id); }};
|
2020-03-13 18:02:54 +01:00
|
|
|
else
|
|
|
|
send_routed_message(workers_socket, run.worker_routing_id, "RUN");
|
|
|
|
}
|
|
|
|
|
|
|
|
void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& parts) {
|
|
|
|
bool outgoing = connections[conn_index].getsockopt<int>(ZMQ_TYPE) == ZMQ_DEALER;
|
|
|
|
|
|
|
|
peer_info tmp_peer;
|
|
|
|
tmp_peer.conn_index = conn_index;
|
|
|
|
if (!outgoing) tmp_peer.route = parts[0].to_string();
|
|
|
|
peer_info* peer = nullptr;
|
|
|
|
if (outgoing) {
|
|
|
|
auto it = peers.find(conn_index_to_id[conn_index]);
|
|
|
|
if (it == peers.end()) {
|
|
|
|
LMQ_LOG(warn, "Internal error: connection index ", conn_index, " not found");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
peer = &it->second;
|
|
|
|
} else {
|
1.1.0: invocation-time SN auth; failure responses
This replaces the recognition of SN status to be checked per-command
invocation rather than on connection. As this breaks the API quite
substantially, though doesn't really affect the functionality, it seems
suitable to bump the minor version.
This requires a fundamental shift in how the calling application tells
LokiMQ about service nodes: rather than using a callback invoked on
connection, the application now has to call set_active_sns() (or the
more efficient update_active_sns(), if changes are readily available) to
update the list whenever it changes. LokiMQ then keeps this list
internally and uses it when determining whether to invoke.
This release also brings better request responses on errors: when a
request fails, the data argument will now be set to the failure reason,
one of:
- TIMEOUT
- UNKNOWNCOMMAND
- NOT_A_SERVICE_NODE (the remote isn't running in SN mode)
- FORBIDDEN (auth level denies the request)
- FORBIDDEN_SN (SN required and the remote doesn't see us as a SN)
Some of these (UNKNOWNCOMMAND, NOT_A_SERVICE_NODE, FORBIDDEN) were
already sent by remotes, but there was no connection to a request and so
they would log a warning, but the request would have to time out.
These errors (minus TIMEOUT, plus NO_REPLY_TAG signalling that a command
is a request but didn't include a reply tag) are also sent in response
to regular commands, but they simply result in a log warning showing the
error type and the command that caused the failure when received.
2020-04-13 00:57:19 +02:00
|
|
|
std::tie(tmp_peer.pubkey, tmp_peer.auth_level) = detail::extract_metadata(parts.back());
|
|
|
|
tmp_peer.service_node = tmp_peer.pubkey.size() == 32 && active_service_nodes.count(tmp_peer.pubkey);
|
|
|
|
|
2020-03-13 18:02:54 +01:00
|
|
|
if (tmp_peer.service_node) {
|
|
|
|
// It's a service node so we should have a peer_info entry; see if we can find one with
|
|
|
|
// the same route, and if not, add one.
|
|
|
|
auto pr = peers.equal_range(tmp_peer.pubkey);
|
|
|
|
for (auto it = pr.first; it != pr.second; ++it) {
|
1.1.0: invocation-time SN auth; failure responses
This replaces the recognition of SN status to be checked per-command
invocation rather than on connection. As this breaks the API quite
substantially, though doesn't really affect the functionality, it seems
suitable to bump the minor version.
This requires a fundamental shift in how the calling application tells
LokiMQ about service nodes: rather than using a callback invoked on
connection, the application now has to call set_active_sns() (or the
more efficient update_active_sns(), if changes are readily available) to
update the list whenever it changes. LokiMQ then keeps this list
internally and uses it when determining whether to invoke.
This release also brings better request responses on errors: when a
request fails, the data argument will now be set to the failure reason,
one of:
- TIMEOUT
- UNKNOWNCOMMAND
- NOT_A_SERVICE_NODE (the remote isn't running in SN mode)
- FORBIDDEN (auth level denies the request)
- FORBIDDEN_SN (SN required and the remote doesn't see us as a SN)
Some of these (UNKNOWNCOMMAND, NOT_A_SERVICE_NODE, FORBIDDEN) were
already sent by remotes, but there was no connection to a request and so
they would log a warning, but the request would have to time out.
These errors (minus TIMEOUT, plus NO_REPLY_TAG signalling that a command
is a request but didn't include a reply tag) are also sent in response
to regular commands, but they simply result in a log warning showing the
error type and the command that caused the failure when received.
2020-04-13 00:57:19 +02:00
|
|
|
if (it->second.conn_index == tmp_peer.conn_index && it->second.route == tmp_peer.route) {
|
2020-03-13 18:02:54 +01:00
|
|
|
peer = &it->second;
|
1.1.0: invocation-time SN auth; failure responses
This replaces the recognition of SN status to be checked per-command
invocation rather than on connection. As this breaks the API quite
substantially, though doesn't really affect the functionality, it seems
suitable to bump the minor version.
This requires a fundamental shift in how the calling application tells
LokiMQ about service nodes: rather than using a callback invoked on
connection, the application now has to call set_active_sns() (or the
more efficient update_active_sns(), if changes are readily available) to
update the list whenever it changes. LokiMQ then keeps this list
internally and uses it when determining whether to invoke.
This release also brings better request responses on errors: when a
request fails, the data argument will now be set to the failure reason,
one of:
- TIMEOUT
- UNKNOWNCOMMAND
- NOT_A_SERVICE_NODE (the remote isn't running in SN mode)
- FORBIDDEN (auth level denies the request)
- FORBIDDEN_SN (SN required and the remote doesn't see us as a SN)
Some of these (UNKNOWNCOMMAND, NOT_A_SERVICE_NODE, FORBIDDEN) were
already sent by remotes, but there was no connection to a request and so
they would log a warning, but the request would have to time out.
These errors (minus TIMEOUT, plus NO_REPLY_TAG signalling that a command
is a request but didn't include a reply tag) are also sent in response
to regular commands, but they simply result in a log warning showing the
error type and the command that caused the failure when received.
2020-04-13 00:57:19 +02:00
|
|
|
// Update the stored auth level just in case the peer reconnected
|
|
|
|
peer->auth_level = tmp_peer.auth_level;
|
2020-03-13 18:02:54 +01:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!peer) {
|
1.1.0: invocation-time SN auth; failure responses
This replaces the recognition of SN status to be checked per-command
invocation rather than on connection. As this breaks the API quite
substantially, though doesn't really affect the functionality, it seems
suitable to bump the minor version.
This requires a fundamental shift in how the calling application tells
LokiMQ about service nodes: rather than using a callback invoked on
connection, the application now has to call set_active_sns() (or the
more efficient update_active_sns(), if changes are readily available) to
update the list whenever it changes. LokiMQ then keeps this list
internally and uses it when determining whether to invoke.
This release also brings better request responses on errors: when a
request fails, the data argument will now be set to the failure reason,
one of:
- TIMEOUT
- UNKNOWNCOMMAND
- NOT_A_SERVICE_NODE (the remote isn't running in SN mode)
- FORBIDDEN (auth level denies the request)
- FORBIDDEN_SN (SN required and the remote doesn't see us as a SN)
Some of these (UNKNOWNCOMMAND, NOT_A_SERVICE_NODE, FORBIDDEN) were
already sent by remotes, but there was no connection to a request and so
they would log a warning, but the request would have to time out.
These errors (minus TIMEOUT, plus NO_REPLY_TAG signalling that a command
is a request but didn't include a reply tag) are also sent in response
to regular commands, but they simply result in a log warning showing the
error type and the command that caused the failure when received.
2020-04-13 00:57:19 +02:00
|
|
|
// We don't have a record: this is either a new SN connection or a new message on a
|
|
|
|
// connection that recently gained SN status.
|
2020-03-13 18:02:54 +01:00
|
|
|
peer = &peers.emplace(ConnectionID{tmp_peer.pubkey}, std::move(tmp_peer))->second;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Incoming, non-SN connection: we don't store a peer_info for this, so just use the
|
|
|
|
// temporary one
|
|
|
|
peer = &tmp_peer;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t command_part_index = outgoing ? 0 : 1;
|
|
|
|
std::string command = parts[command_part_index].to_string();
|
|
|
|
|
|
|
|
// Steal any data message parts
|
|
|
|
size_t data_part_index = command_part_index + 1;
|
|
|
|
std::vector<zmq::message_t> data_parts;
|
|
|
|
data_parts.reserve(parts.size() - data_part_index);
|
|
|
|
for (auto it = parts.begin() + data_part_index; it != parts.end(); ++it)
|
|
|
|
data_parts.push_back(std::move(*it));
|
|
|
|
|
1.1.0: invocation-time SN auth; failure responses
This replaces the recognition of SN status to be checked per-command
invocation rather than on connection. As this breaks the API quite
substantially, though doesn't really affect the functionality, it seems
suitable to bump the minor version.
This requires a fundamental shift in how the calling application tells
LokiMQ about service nodes: rather than using a callback invoked on
connection, the application now has to call set_active_sns() (or the
more efficient update_active_sns(), if changes are readily available) to
update the list whenever it changes. LokiMQ then keeps this list
internally and uses it when determining whether to invoke.
This release also brings better request responses on errors: when a
request fails, the data argument will now be set to the failure reason,
one of:
- TIMEOUT
- UNKNOWNCOMMAND
- NOT_A_SERVICE_NODE (the remote isn't running in SN mode)
- FORBIDDEN (auth level denies the request)
- FORBIDDEN_SN (SN required and the remote doesn't see us as a SN)
Some of these (UNKNOWNCOMMAND, NOT_A_SERVICE_NODE, FORBIDDEN) were
already sent by remotes, but there was no connection to a request and so
they would log a warning, but the request would have to time out.
These errors (minus TIMEOUT, plus NO_REPLY_TAG signalling that a command
is a request but didn't include a reply tag) are also sent in response
to regular commands, but they simply result in a log warning showing the
error type and the command that caused the failure when received.
2020-04-13 00:57:19 +02:00
|
|
|
auto cat_call = get_command(command);
|
|
|
|
|
|
|
|
// Check that command is valid, that we have permission, etc.
|
|
|
|
if (!proxy_check_auth(conn_index, outgoing, *peer, parts[command_part_index], cat_call, data_parts))
|
|
|
|
return;
|
|
|
|
|
|
|
|
auto& category = *cat_call.first;
|
2020-04-24 02:52:39 +02:00
|
|
|
Access access{peer->auth_level, peer->service_node, local_service_node};
|
1.1.0: invocation-time SN auth; failure responses
This replaces the recognition of SN status to be checked per-command
invocation rather than on connection. As this breaks the API quite
substantially, though doesn't really affect the functionality, it seems
suitable to bump the minor version.
This requires a fundamental shift in how the calling application tells
LokiMQ about service nodes: rather than using a callback invoked on
connection, the application now has to call set_active_sns() (or the
more efficient update_active_sns(), if changes are readily available) to
update the list whenever it changes. LokiMQ then keeps this list
internally and uses it when determining whether to invoke.
This release also brings better request responses on errors: when a
request fails, the data argument will now be set to the failure reason,
one of:
- TIMEOUT
- UNKNOWNCOMMAND
- NOT_A_SERVICE_NODE (the remote isn't running in SN mode)
- FORBIDDEN (auth level denies the request)
- FORBIDDEN_SN (SN required and the remote doesn't see us as a SN)
Some of these (UNKNOWNCOMMAND, NOT_A_SERVICE_NODE, FORBIDDEN) were
already sent by remotes, but there was no connection to a request and so
they would log a warning, but the request would have to time out.
These errors (minus TIMEOUT, plus NO_REPLY_TAG signalling that a command
is a request but didn't include a reply tag) are also sent in response
to regular commands, but they simply result in a log warning showing the
error type and the command that caused the failure when received.
2020-04-13 00:57:19 +02:00
|
|
|
|
2020-03-13 18:02:54 +01:00
|
|
|
if (category.active_threads >= category.reserved_threads && active_workers() >= general_workers) {
|
|
|
|
// No free reserved or general spots, try to queue it for later
|
|
|
|
if (category.max_queue >= 0 && category.queued >= category.max_queue) {
|
|
|
|
LMQ_LOG(warn, "No space to queue incoming command ", command, "; already have ", category.queued,
|
|
|
|
"commands queued in that category (max ", category.max_queue, "); dropping message");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
LMQ_LOG(debug, "No available free workers, queuing ", command, " for later");
|
|
|
|
ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey, std::move(tmp_peer.route)};
|
2020-04-24 23:58:24 +02:00
|
|
|
pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second,
|
|
|
|
std::move(conn), std::move(access), peer_address(parts[command_part_index]));
|
2020-03-13 18:02:54 +01:00
|
|
|
category.queued++;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cat_call.second->second /*is_request*/ && data_parts.empty()) {
|
|
|
|
LMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto& run = get_idle_worker();
|
|
|
|
{
|
|
|
|
ConnectionID c{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey};
|
|
|
|
c.route = std::move(tmp_peer.route);
|
|
|
|
if (outgoing || peer->service_node)
|
|
|
|
tmp_peer.route.clear();
|
2020-04-24 23:58:24 +02:00
|
|
|
run.load(&category, std::move(command), std::move(c), std::move(access), peer_address(parts[command_part_index]),
|
|
|
|
std::move(data_parts), cat_call.second);
|
2020-03-13 18:02:54 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if (outgoing)
|
|
|
|
peer->activity(); // outgoing connection activity, pump the activity timer
|
|
|
|
|
|
|
|
LMQ_TRACE("Forwarding incoming ", run.command, " from ", run.conn, " @ ", peer_address(parts[command_part_index]),
|
|
|
|
" to worker ", run.worker_routing_id);
|
|
|
|
|
|
|
|
proxy_run_worker(run);
|
|
|
|
category.active_threads++;
|
|
|
|
}
|
|
|
|
|
2020-06-08 02:30:46 +02:00
|
|
|
void LokiMQ::inject_task(const std::string& category, std::string command, std::string remote, std::function<void()> callback) {
|
|
|
|
if (!callback) return;
|
|
|
|
auto it = categories.find(category);
|
|
|
|
if (it == categories.end())
|
|
|
|
throw std::out_of_range{"Invalid category `" + category + "': category does not exist"};
|
|
|
|
detail::send_control(get_control_socket(), "INJECT", bt_serialize(detail::serialize_object(
|
|
|
|
injected_task{it->second, std::move(command), std::move(remote), std::move(callback)})));
|
|
|
|
}
|
|
|
|
|
|
|
|
void LokiMQ::proxy_inject_task(injected_task task) {
|
|
|
|
auto& category = task.cat;
|
|
|
|
if (category.active_threads >= category.reserved_threads && active_workers() >= general_workers) {
|
|
|
|
// No free worker slot, queue for later
|
|
|
|
if (category.max_queue >= 0 && category.queued >= category.max_queue) {
|
|
|
|
LMQ_LOG(warn, "No space to queue injected task ", task.command, "; already have ", category.queued,
|
|
|
|
"commands queued in that category (max ", category.max_queue, "); dropping task");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
LMQ_LOG(debug, "No available free workers for injected task ", task.command, "; queuing for later");
|
|
|
|
pending_commands.emplace_back(category, std::move(task.command), std::move(task.callback), std::move(task.remote));
|
|
|
|
category.queued++;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto& run = get_idle_worker();
|
|
|
|
LMQ_TRACE("Forwarding incoming injected task ", task.command, " from ", task.remote, " to worker ", run.worker_routing_id);
|
|
|
|
run.load(&category, std::move(task.command), std::move(task.remote), std::move(task.callback));
|
|
|
|
|
|
|
|
proxy_run_worker(run);
|
|
|
|
category.active_threads++;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-03-13 18:02:54 +01:00
|
|
|
|
|
|
|
}
|