mirror of https://github.com/oxen-io/oxen-mq.git
Add remote address into Message object
Can be useful for end point logging.
This commit is contained in:
parent
a812abd422
commit
f4f1506df0
|
@ -341,7 +341,7 @@ void LokiMQ::set_general_threads(int threads) {
|
||||||
general_workers = threads;
|
general_workers = threads;
|
||||||
}
|
}
|
||||||
|
|
||||||
LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, Access access_,
|
LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, Access access_, std::string remote_,
|
||||||
std::vector<zmq::message_t> data_parts_, const std::pair<CommandCallback, bool>* callback_) {
|
std::vector<zmq::message_t> data_parts_, const std::pair<CommandCallback, bool>* callback_) {
|
||||||
is_batch_job = false;
|
is_batch_job = false;
|
||||||
is_reply_job = false;
|
is_reply_job = false;
|
||||||
|
@ -349,6 +349,7 @@ LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, C
|
||||||
command = std::move(command_);
|
command = std::move(command_);
|
||||||
conn = std::move(conn_);
|
conn = std::move(conn_);
|
||||||
access = std::move(access_);
|
access = std::move(access_);
|
||||||
|
remote = std::move(remote_);
|
||||||
data_parts = std::move(data_parts_);
|
data_parts = std::move(data_parts_);
|
||||||
callback = callback_;
|
callback = callback_;
|
||||||
return *this;
|
return *this;
|
||||||
|
@ -356,7 +357,7 @@ LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, C
|
||||||
|
|
||||||
LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) {
|
LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) {
|
||||||
return load(&pending.cat, std::move(pending.command), std::move(pending.conn), std::move(pending.access),
|
return load(&pending.cat, std::move(pending.command), std::move(pending.conn), std::move(pending.access),
|
||||||
std::move(pending.data_parts), pending.callback);
|
std::move(pending.remote), std::move(pending.data_parts), pending.callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) {
|
LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) {
|
||||||
|
|
|
@ -136,7 +136,8 @@ public:
|
||||||
/// Callback type invoked to determine whether the given new incoming connection is allowed to
|
/// Callback type invoked to determine whether the given new incoming connection is allowed to
|
||||||
/// connect to us and to set its authentication level.
|
/// connect to us and to set its authentication level.
|
||||||
///
|
///
|
||||||
/// @param ip - the ip address of the incoming connection
|
/// @param address - the address of the incoming connection. For TCP connections this is an IP
|
||||||
|
/// address; for UDP connections it's a string such as "localhost:UID:GID:PID".
|
||||||
/// @param pubkey - the x25519 pubkey of the connecting client (32 byte string). Note that this
|
/// @param pubkey - the x25519 pubkey of the connecting client (32 byte string). Note that this
|
||||||
/// will only be non-empty for incoming connections on `listen_curve` sockets; `listen_plain`
|
/// will only be non-empty for incoming connections on `listen_curve` sockets; `listen_plain`
|
||||||
/// sockets do not have a pubkey.
|
/// sockets do not have a pubkey.
|
||||||
|
@ -145,7 +146,7 @@ public:
|
||||||
///
|
///
|
||||||
/// @returns an `AuthLevel` enum value indicating the default auth level for the incoming
|
/// @returns an `AuthLevel` enum value indicating the default auth level for the incoming
|
||||||
/// connection, or AuthLevel::denied if the connection should be refused.
|
/// connection, or AuthLevel::denied if the connection should be refused.
|
||||||
using AllowFunc = std::function<AuthLevel(string_view ip, string_view pubkey, bool service_node)>;
|
using AllowFunc = std::function<AuthLevel(string_view address, string_view pubkey, bool service_node)>;
|
||||||
|
|
||||||
/// Callback that is invoked when we need to send a "strong" message to a SN that we aren't
|
/// Callback that is invoked when we need to send a "strong" message to a SN that we aren't
|
||||||
/// already connected to and need to establish a connection. This callback returns the ZMQ
|
/// already connected to and need to establish a connection. This callback returns the ZMQ
|
||||||
|
@ -587,11 +588,12 @@ private:
|
||||||
const std::pair<CommandCallback, bool>* callback;
|
const std::pair<CommandCallback, bool>* callback;
|
||||||
ConnectionID conn;
|
ConnectionID conn;
|
||||||
Access access;
|
Access access;
|
||||||
|
std::string remote;
|
||||||
|
|
||||||
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts,
|
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts,
|
||||||
const std::pair<CommandCallback, bool>* callback, ConnectionID conn, Access access)
|
const std::pair<CommandCallback, bool>* callback, ConnectionID conn, Access access, std::string remote)
|
||||||
: cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)},
|
: cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)},
|
||||||
callback{callback}, conn{std::move(conn)}, access{std::move(access)} {}
|
callback{callback}, conn{std::move(conn)}, access{std::move(access)}, remote{std::move(remote)} {}
|
||||||
};
|
};
|
||||||
std::list<pending_command> pending_commands;
|
std::list<pending_command> pending_commands;
|
||||||
|
|
||||||
|
@ -612,6 +614,7 @@ private:
|
||||||
std::string command;
|
std::string command;
|
||||||
ConnectionID conn; // The connection (or SN pubkey) to reply on/to.
|
ConnectionID conn; // The connection (or SN pubkey) to reply on/to.
|
||||||
Access access; // The access level of the invoker (actual level, can be higher than the command's requirement)
|
Access access; // The access level of the invoker (actual level, can be higher than the command's requirement)
|
||||||
|
std::string remote; // The remote address from which we received the request.
|
||||||
std::string conn_route; // if non-empty this is the reply routing prefix (for incoming connections)
|
std::string conn_route; // if non-empty this is the reply routing prefix (for incoming connections)
|
||||||
std::vector<zmq::message_t> data_parts;
|
std::vector<zmq::message_t> data_parts;
|
||||||
|
|
||||||
|
@ -629,7 +632,7 @@ private:
|
||||||
std::string worker_routing_id; // "w123" where 123 == worker_id
|
std::string worker_routing_id; // "w123" where 123 == worker_id
|
||||||
|
|
||||||
/// Loads the run info with an incoming command
|
/// Loads the run info with an incoming command
|
||||||
run_info& load(category* cat, std::string command, ConnectionID conn, Access access,
|
run_info& load(category* cat, std::string command, ConnectionID conn, Access access, std::string remote,
|
||||||
std::vector<zmq::message_t> data_parts, const std::pair<CommandCallback, bool>* callback);
|
std::vector<zmq::message_t> data_parts, const std::pair<CommandCallback, bool>* callback);
|
||||||
|
|
||||||
/// Loads the run info with a stored pending command
|
/// Loads the run info with a stored pending command
|
||||||
|
|
|
@ -16,9 +16,11 @@ public:
|
||||||
ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status.
|
ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status.
|
||||||
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`.
|
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`.
|
||||||
Access access; ///< The access level of the invoker. This can be higher than the access level of the command, for example for an admin invoking a basic command.
|
Access access; ///< The access level of the invoker. This can be higher than the access level of the command, for example for an admin invoking a basic command.
|
||||||
|
std::string remote; ///< Some sort of remote address from which the request came. Often "IP" for TCP connections and "localhost:UID:GID:PID" for UDP connections.
|
||||||
|
|
||||||
/// Constructor
|
/// Constructor
|
||||||
Message(LokiMQ& lmq, ConnectionID cid, Access access) : lokimq{lmq}, conn{std::move(cid)}, access{std::move(access)} {}
|
Message(LokiMQ& lmq, ConnectionID cid, Access access, std::string remote)
|
||||||
|
: lokimq{lmq}, conn{std::move(cid)}, access{std::move(access)}, remote{std::move(remote)} {}
|
||||||
|
|
||||||
// Non-copyable
|
// Non-copyable
|
||||||
Message(const Message&) = delete;
|
Message(const Message&) = delete;
|
||||||
|
|
|
@ -21,7 +21,7 @@ void LokiMQ::worker_thread(unsigned int index) {
|
||||||
LMQ_LOG(debug, "New worker thread ", worker_id, " started");
|
LMQ_LOG(debug, "New worker thread ", worker_id, " started");
|
||||||
sock.connect(SN_ADDR_WORKERS);
|
sock.connect(SN_ADDR_WORKERS);
|
||||||
|
|
||||||
Message message{*this, 0, AuthLevel::none};
|
Message message{*this, 0, AuthLevel::none, ""s};
|
||||||
std::vector<zmq::message_t> parts;
|
std::vector<zmq::message_t> parts;
|
||||||
run_info& run = workers[index]; // This contains our first job, and will be updated later with subsequent jobs
|
run_info& run = workers[index]; // This contains our first job, and will be updated later with subsequent jobs
|
||||||
|
|
||||||
|
@ -38,9 +38,10 @@ void LokiMQ::worker_thread(unsigned int index) {
|
||||||
} else {
|
} else {
|
||||||
message.conn = run.conn;
|
message.conn = run.conn;
|
||||||
message.access = run.access;
|
message.access = run.access;
|
||||||
|
message.remote = std::move(run.remote);
|
||||||
message.data.clear();
|
message.data.clear();
|
||||||
|
|
||||||
LMQ_TRACE("Got incoming command from ", message.conn, message.conn.route.empty() ? "(outgoing)" : "(incoming)");
|
LMQ_TRACE("Got incoming command from ", message.remote, "/", message.conn, message.conn.route.empty() ? " (outgoing)" : " (incoming)");
|
||||||
|
|
||||||
if (run.callback->second /*is_request*/) {
|
if (run.callback->second /*is_request*/) {
|
||||||
message.reply_tag = {run.data_parts[0].data<char>(), run.data_parts[0].size()};
|
message.reply_tag = {run.data_parts[0].data<char>(), run.data_parts[0].size()};
|
||||||
|
@ -267,7 +268,8 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
|
||||||
|
|
||||||
LMQ_LOG(debug, "No available free workers, queuing ", command, " for later");
|
LMQ_LOG(debug, "No available free workers, queuing ", command, " for later");
|
||||||
ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey, std::move(tmp_peer.route)};
|
ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey, std::move(tmp_peer.route)};
|
||||||
pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::move(conn), std::move(access));
|
pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second,
|
||||||
|
std::move(conn), std::move(access), peer_address(parts[command_part_index]));
|
||||||
category.queued++;
|
category.queued++;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -283,7 +285,8 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector<zmq::message_t>& par
|
||||||
c.route = std::move(tmp_peer.route);
|
c.route = std::move(tmp_peer.route);
|
||||||
if (outgoing || peer->service_node)
|
if (outgoing || peer->service_node)
|
||||||
tmp_peer.route.clear();
|
tmp_peer.route.clear();
|
||||||
run.load(&category, std::move(command), std::move(c), std::move(access), std::move(data_parts), cat_call.second);
|
run.load(&category, std::move(command), std::move(c), std::move(access), peer_address(parts[command_part_index]),
|
||||||
|
std::move(data_parts), cat_call.second);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (outgoing)
|
if (outgoing)
|
||||||
|
|
Loading…
Reference in New Issue