From f4f1506df0d46c5e2d2f2db0295848f7f8b16930 Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Fri, 24 Apr 2020 18:58:24 -0300 Subject: [PATCH] Add remote address into Message object Can be useful for end point logging. --- lokimq/lokimq.cpp | 5 +++-- lokimq/lokimq.h | 13 ++++++++----- lokimq/message.h | 4 +++- lokimq/worker.cpp | 11 +++++++---- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index bf89467..eef35be 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -341,7 +341,7 @@ void LokiMQ::set_general_threads(int threads) { general_workers = threads; } -LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, Access access_, +LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, Access access_, std::string remote_, std::vector data_parts_, const std::pair* callback_) { is_batch_job = false; is_reply_job = false; @@ -349,6 +349,7 @@ LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, C command = std::move(command_); conn = std::move(conn_); access = std::move(access_); + remote = std::move(remote_); data_parts = std::move(data_parts_); callback = callback_; return *this; @@ -356,7 +357,7 @@ LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, C LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) { return load(&pending.cat, std::move(pending.command), std::move(pending.conn), std::move(pending.access), - std::move(pending.data_parts), pending.callback); + std::move(pending.remote), std::move(pending.data_parts), pending.callback); } LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) { diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index fa8eced..8b161b4 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -136,7 +136,8 @@ public: /// Callback type invoked to determine whether the given new incoming connection is allowed to /// connect to us and to set its authentication level. /// - /// @param ip - the ip address of the incoming connection + /// @param address - the address of the incoming connection. For TCP connections this is an IP + /// address; for UDP connections it's a string such as "localhost:UID:GID:PID". /// @param pubkey - the x25519 pubkey of the connecting client (32 byte string). Note that this /// will only be non-empty for incoming connections on `listen_curve` sockets; `listen_plain` /// sockets do not have a pubkey. @@ -145,7 +146,7 @@ public: /// /// @returns an `AuthLevel` enum value indicating the default auth level for the incoming /// connection, or AuthLevel::denied if the connection should be refused. - using AllowFunc = std::function; + using AllowFunc = std::function; /// Callback that is invoked when we need to send a "strong" message to a SN that we aren't /// already connected to and need to establish a connection. This callback returns the ZMQ @@ -587,11 +588,12 @@ private: const std::pair* callback; ConnectionID conn; Access access; + std::string remote; pending_command(category& cat, std::string command, std::vector data_parts, - const std::pair* callback, ConnectionID conn, Access access) + const std::pair* callback, ConnectionID conn, Access access, std::string remote) : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, - callback{callback}, conn{std::move(conn)}, access{std::move(access)} {} + callback{callback}, conn{std::move(conn)}, access{std::move(access)}, remote{std::move(remote)} {} }; std::list pending_commands; @@ -612,6 +614,7 @@ private: std::string command; ConnectionID conn; // The connection (or SN pubkey) to reply on/to. Access access; // The access level of the invoker (actual level, can be higher than the command's requirement) + std::string remote; // The remote address from which we received the request. std::string conn_route; // if non-empty this is the reply routing prefix (for incoming connections) std::vector data_parts; @@ -629,7 +632,7 @@ private: std::string worker_routing_id; // "w123" where 123 == worker_id /// Loads the run info with an incoming command - run_info& load(category* cat, std::string command, ConnectionID conn, Access access, + run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote, std::vector data_parts, const std::pair* callback); /// Loads the run info with a stored pending command diff --git a/lokimq/message.h b/lokimq/message.h index 2f55a65..272e78b 100644 --- a/lokimq/message.h +++ b/lokimq/message.h @@ -16,9 +16,11 @@ public: ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status. std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`. Access access; ///< The access level of the invoker. This can be higher than the access level of the command, for example for an admin invoking a basic command. + std::string remote; ///< Some sort of remote address from which the request came. Often "IP" for TCP connections and "localhost:UID:GID:PID" for UDP connections. /// Constructor - Message(LokiMQ& lmq, ConnectionID cid, Access access) : lokimq{lmq}, conn{std::move(cid)}, access{std::move(access)} {} + Message(LokiMQ& lmq, ConnectionID cid, Access access, std::string remote) + : lokimq{lmq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {} // Non-copyable Message(const Message&) = delete; diff --git a/lokimq/worker.cpp b/lokimq/worker.cpp index ce49b4d..e2df15c 100644 --- a/lokimq/worker.cpp +++ b/lokimq/worker.cpp @@ -21,7 +21,7 @@ void LokiMQ::worker_thread(unsigned int index) { LMQ_LOG(debug, "New worker thread ", worker_id, " started"); sock.connect(SN_ADDR_WORKERS); - Message message{*this, 0, AuthLevel::none}; + Message message{*this, 0, AuthLevel::none, ""s}; std::vector parts; run_info& run = workers[index]; // This contains our first job, and will be updated later with subsequent jobs @@ -38,9 +38,10 @@ void LokiMQ::worker_thread(unsigned int index) { } else { message.conn = run.conn; message.access = run.access; + message.remote = std::move(run.remote); message.data.clear(); - LMQ_TRACE("Got incoming command from ", message.conn, message.conn.route.empty() ? "(outgoing)" : "(incoming)"); + LMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)"); if (run.callback->second /*is_request*/) { message.reply_tag = {run.data_parts[0].data(), run.data_parts[0].size()}; @@ -267,7 +268,8 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par LMQ_LOG(debug, "No available free workers, queuing ", command, " for later"); ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey, std::move(tmp_peer.route)}; - pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::move(conn), std::move(access)); + pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, + std::move(conn), std::move(access), peer_address(parts[command_part_index])); category.queued++; return; } @@ -283,7 +285,8 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par c.route = std::move(tmp_peer.route); if (outgoing || peer->service_node) tmp_peer.route.clear(); - run.load(&category, std::move(command), std::move(c), std::move(access), std::move(data_parts), cat_call.second); + run.load(&category, std::move(command), std::move(c), std::move(access), peer_address(parts[command_part_index]), + std::move(data_parts), cat_call.second); } if (outgoing)