diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index c4660d7..d451a2e 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -19,6 +19,13 @@ constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01"; // LMQ_LOG(warn, "bad ", 42, " stuff"); #define LMQ_LOG(level, ...) log_(LogLevel::level, __FILE__, __LINE__, __VA_ARGS__) +#ifndef NDEBUG +// Same as LMQ_LOG(trace, ...) when not doing a release build; nothing under a release build. +# define LMQ_TRACE(...) log_(LogLevel::trace, __FILE__, __LINE__, __VA_ARGS__) +#else +# define LMQ_TRACE(...) +#endif + // This is the domain used for listening service nodes. constexpr const char AUTH_DOMAIN_SN[] = "loki.sn"; @@ -346,7 +353,7 @@ LokiMQ::LokiMQ( bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{std::move(logger)}, poll_remote_offset{poll_internal_size + (bind.empty() ? 0 : 1)} { - LMQ_LOG(trace, "Constructing listening LokiMQ, id=", object_id, ", this=", this); + LMQ_TRACE("Constructing listening LokiMQ, id=", object_id, ", this=", this); if (pubkey.empty() != privkey.empty()) { throw std::invalid_argument("LokiMQ construction failed: one (and only one) of pubkey/privkey is empty. Both must be specified, or both empty to generate a key."); @@ -393,7 +400,7 @@ void LokiMQ::start() { LMQ_LOG(debug, "Waiting for proxy thread to get ready..."); auto &control = get_control_socket(); detail::send_control(control, "START"); - LMQ_LOG(trace, "Sent START command"); + LMQ_TRACE("Sent START command"); zmq::message_t ready_msg; std::vector parts; @@ -421,10 +428,10 @@ void LokiMQ::worker_thread(unsigned int index) { try { if (run.is_batch_job) { if (run.batch_jobno >= 0) { - LMQ_LOG(trace, "worker thread ", worker_id, " running batch ", run.batch, "#", run.batch_jobno); + LMQ_TRACE("worker thread ", worker_id, " running batch ", run.batch, "#", run.batch_jobno); run.batch->run_job(run.batch_jobno); } else if (run.batch_jobno == -1) { - LMQ_LOG(trace, "worker thread ", worker_id, " running batch ", run.batch, " completion"); + LMQ_TRACE("worker thread ", worker_id, " running batch ", run.batch, " completion"); run.batch->job_completion(); } } else { @@ -441,7 +448,7 @@ void LokiMQ::worker_thread(unsigned int index) { message.data.emplace_back(m.data(), m.size()); } - LMQ_LOG(trace, "worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); + LMQ_TRACE("worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); run.callback->first(message); } @@ -502,7 +509,7 @@ void LokiMQ::worker_thread(unsigned int index) { // Signal that we are ready for another job and wait for it. (We do this down here // because our first job gets set up when the thread is started). detail::send_control(sock, "RAN"); - LMQ_LOG(trace, "worker ", worker_id, " waiting for requests"); + LMQ_TRACE("worker ", worker_id, " waiting for requests"); parts.clear(); recv_message_parts(sock, parts); @@ -581,7 +588,7 @@ LokiMQ::proxy_connect_sn(const std::string &remote, const std::string &connect_h } if (result.first) { - LMQ_LOG(trace, "proxy asked to connect to ", to_hex(remote), "; reusing existing connection"); + LMQ_TRACE("proxy asked to connect to ", to_hex(remote), "; reusing existing connection"); if (outgoing) { if (peer.idle_expiry < keep_alive) { LMQ_LOG(debug, "updating existing outgoing peer connection idle expiry time from ", @@ -779,17 +786,17 @@ void LokiMQ::proxy_control_message(std::vector& parts) { if (parts.size() < 2) throw std::logic_error("Expected 2-3 message parts for a proxy control message"); auto route = view(parts[0]), cmd = view(parts[1]); - LMQ_LOG(trace, "control message: ", cmd); + LMQ_TRACE("control message: ", cmd); if (parts.size() == 3) { - LMQ_LOG(trace, "...: ", parts[2]); + LMQ_TRACE("...: ", parts[2]); if (cmd == "SEND") { - LMQ_LOG(trace, "proxying message"); + LMQ_TRACE("proxying message"); return proxy_send(view(parts[2])); } else if (cmd == "REPLY") { - LMQ_LOG(trace, "proxying reply to non-SN incoming message"); + LMQ_TRACE("proxying reply to non-SN incoming message"); return proxy_reply(view(parts[2])); } else if (cmd == "BATCH") { - LMQ_LOG(trace, "proxy batch jobs"); + LMQ_TRACE("proxy batch jobs"); auto ptrval = bt_deserialize(view(parts[2])); return proxy_batch(reinterpret_cast(ptrval)); } else if (cmd == "CONNECT_SN") { @@ -887,7 +894,7 @@ void LokiMQ::proxy_conn_cleanup() { // general workers: if we don't have any idle workers then we may still have incoming messages which // we haven't processed yet and those messages might end up resetting the last activity time. if (workers.size() < general_workers) { - LMQ_LOG(trace, "closing idle connections"); + LMQ_TRACE("closing idle connections"); proxy_expire_idle_peers(); } @@ -934,12 +941,14 @@ void LokiMQ::proxy_loop() { max_workers += cat.second.reserved_threads; } +#ifndef NDEBUG if (log_level() >= LogLevel::trace) { - LMQ_LOG(trace, "Reserving space for ", max_workers, " max workers = ", general_workers, " general + reserved:"); + LMQ_TRACE("Reserving space for ", max_workers, " max workers = ", general_workers, " general + reserved:"); for (const auto& cat : categories) - LMQ_LOG(trace, " - ", cat.first, ": ", cat.second.reserved_threads); - LMQ_LOG(trace, " - (batch jobs): ", batch_jobs_reserved); + LMQ_TRACE(" - ", cat.first, ": ", cat.second.reserved_threads); + LMQ_TRACE(" - (batch jobs): ", batch_jobs_reserved); } +#endif workers.reserve(max_workers); if (!workers.empty()) @@ -1007,7 +1016,7 @@ void LokiMQ::proxy_loop() { if (proxy_skip_poll) proxy_skip_poll = false; else { - LMQ_LOG(trace, "polling for new messages"); + LMQ_TRACE("polling for new messages"); // We poll the control socket and worker socket for any incoming messages. If we have // available worker room then also poll incoming connections and outgoing connections // for messages to forward to a worker. Otherwise, we just look for a control message @@ -1015,30 +1024,30 @@ void LokiMQ::proxy_loop() { zmq::poll(pollitems.data(), pollitems.size(), poll_timeout); } - LMQ_LOG(trace, "processing control messages"); + LMQ_TRACE("processing control messages"); // Retrieve any waiting incoming control messages for (parts.clear(); recv_message_parts(command, parts, zmq::recv_flags::dontwait); parts.clear()) { proxy_control_message(parts); } - LMQ_LOG(trace, "processing worker messages"); + LMQ_TRACE("processing worker messages"); for (parts.clear(); recv_message_parts(workers_socket, parts, zmq::recv_flags::dontwait); parts.clear()) { proxy_worker_message(parts); } - LMQ_LOG(trace, "processing timers"); + LMQ_TRACE("processing timers"); zmq_timers_execute(timers.get()); // Handle any zap authentication - LMQ_LOG(trace, "processing zap requests"); + LMQ_TRACE("processing zap requests"); process_zap_requests(zap_auth); // See if we can drain anything from the current queue before we potentially add to it // below. - LMQ_LOG(trace, "processing queued jobs and messages"); + LMQ_TRACE("processing queued jobs and messages"); proxy_process_queue(); - LMQ_LOG(trace, "processing new incoming messages"); + LMQ_TRACE("processing new incoming messages"); // We round-robin connections when pulling off pending messages one-by-one rather than // pulling off all messages from one connection before moving to the next; thus in cases of @@ -1069,7 +1078,7 @@ void LokiMQ::proxy_loop() { proxy_to_worker(i, parts); } - LMQ_LOG(trace, "done proxy loop"); + LMQ_TRACE("done proxy loop"); } } @@ -1117,7 +1126,7 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { return; } auto route = view(parts[0]), cmd = view(parts[1]); - LMQ_LOG(trace, "worker message from ", route); + LMQ_TRACE("worker message from ", route); assert(route.size() >= 2 && route[0] == 'w' && route[1] >= '0' && route[1] <= '9'); string_view worker_id_str{&route[1], route.size()-1}; // Chop off the leading "w" unsigned int worker_id = detail::extract_unsigned(worker_id_str); @@ -1128,7 +1137,7 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { auto& run = workers[worker_id]; - LMQ_LOG(trace, "received ", cmd, " command from ", route); + LMQ_TRACE("received ", cmd, " command from ", route); if (cmd == "RAN") { LMQ_LOG(debug, "Worker ", route, " finished ", run.command); if (run.is_batch_job) { @@ -1168,7 +1177,7 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { run.cat->active_threads--; } if (max_workers == 0) { // Shutting down - LMQ_LOG(trace, "Telling worker ", route, " to quit"); + LMQ_TRACE("Telling worker ", route, " to quit"); route_control(workers_socket, route, "QUIT"); } else { idle_workers.push_back(worker_id); @@ -1219,7 +1228,7 @@ bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector& p route = view(parts[0]); cmd = view(parts[1]); } - LMQ_LOG(trace, "Checking for builtins: ", cmd, " from ", peer_address(parts.back())); + LMQ_TRACE("Checking for builtins: ", cmd, " from ", peer_address(parts.back())); if (cmd == "REPLY") { size_t tag_pos = (is_outgoing_conn ? 1 : 2); @@ -1429,7 +1438,7 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par peer_info.incoming = route; } - LMQ_LOG(trace, "Forwarding incoming ", run.command, " from ", run.service_node ? "SN " : "non-SN ", + LMQ_TRACE("Forwarding incoming ", run.command, " from ", run.service_node ? "SN " : "non-SN ", to_hex(run.pubkey), " @ ", peer_address(parts.back()), " to worker ", run.routing_id); proxy_run_worker(run); @@ -1474,6 +1483,7 @@ bool LokiMQ::proxy_check_auth(string_view pubkey, size_t conn_index, const peer_ void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { for (std::vector frames; recv_message_parts(zap_auth, frames, zmq::recv_flags::dontwait); frames.clear()) { +#ifndef NDEBUG if (log_level() >= LogLevel::trace) { std::ostringstream o; o << "Processing ZAP authentication request:"; @@ -1486,9 +1496,9 @@ void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { o << v; } log_(LogLevel::trace, __FILE__, __LINE__, o.str()); - } else { + } else +#endif LMQ_LOG(debug, "Processing ZAP authentication request"); - } // https://rfc.zeromq.org/spec:27/ZAP/ // @@ -1571,7 +1581,7 @@ void LokiMQ::process_zap_requests(zmq::socket_t &zap_auth) { } } - LMQ_LOG(trace, "ZAP request result: ", status_code, " ", status_text); + LMQ_TRACE("ZAP request result: ", status_code, " ", status_text); std::vector response; response.reserve(response_vals.size()); @@ -1598,7 +1608,7 @@ void LokiMQ::connect_remote(string_view remote, ConnectSuccess on_connect, Conne if (!proxy_thread.joinable()) LMQ_LOG(warn, "connect_remote() called before start(); this won't take effect until start() is called"); - LMQ_LOG(trace, "telling proxy to connect to ", remote, ", expecting pubkey [", to_hex(pubkey), "]"); + LMQ_TRACE("telling proxy to connect to ", remote, ", expecting pubkey [", to_hex(pubkey), "]"); detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize({ {"remote", remote}, {"pubkey", pubkey}, diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index ed09e05..ef70ba9 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -54,8 +54,9 @@ namespace lokimq { using namespace std::literals; -/// Logging levels passed into LogFunc -enum class LogLevel { trace, debug, info, warn, error, fatal }; +/// Logging levels passed into LogFunc. (Note that trace does nothing more than debug in a release +/// build). +enum class LogLevel { fatal, error, warn, info, debug, trace }; /// Authentication levels for command categories and connections enum class AuthLevel { @@ -1028,7 +1029,7 @@ void Message::send_reply(Args&&... args) { template void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) { - if (lvl < log_level()) + if (log_level() < lvl) return; std::ostringstream os;