diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 6d238b0..248759e 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -468,10 +468,9 @@ void LokiMQ::worker_thread(unsigned int index) { } } else { message.conn = run.conn; - message.route = run.conn_route; message.data.clear(); - LMQ_TRACE("Got incoming command from ", message.conn, message.route.empty() ? "(outgoing)" : "(incoming)"); + LMQ_TRACE("Got incoming command from ", message.conn, message.conn.route.empty() ? "(outgoing)" : "(incoming)"); if (run.callback->second /*is_request*/) { message.reply_tag = {run.data_parts[0].data(), run.data_parts[0].size()}; @@ -660,7 +659,6 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { bool request = false; bool have_conn_id = false; ConnectionID conn_id; - string_view conn_route; std::string request_tag; std::unique_ptr request_cbptr; @@ -670,8 +668,15 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { throw std::runtime_error("Invalid error: invalid conn_id value (-1)"); have_conn_id = true; } + if (data.skip_until("conn_pubkey")) { + if (have_conn_id) + throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and conn_pubkey are exclusive"); + conn_id.pk = data.consume_string(); + conn_id.id = ConnectionID::SN_ID; + } else if (!have_conn_id) + throw std::runtime_error("Internal error: Invalid proxy send command; conn_pubkey or conn_id missing"); if (data.skip_until("conn_route")) - conn_route = data.consume_string_view(); + conn_id.route = data.consume_string(); if (data.skip_until("hint")) hint = data.consume_string_view(); if (data.skip_until("incoming")) @@ -680,13 +685,6 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { keep_alive = std::chrono::milliseconds{data.consume_integer()}; if (data.skip_until("optional")) optional = data.consume_integer(); - if (data.skip_until("pubkey")) { - if (have_conn_id) - throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and pubkey are exclusive"); - conn_id.pk = data.consume_string(); - conn_id.id = ConnectionID::SN_ID; - } else if (!have_conn_id) - throw std::runtime_error("Internal error: Invalid proxy send command; pubkey or conn_id missing"); if (data.skip_until("request")) request = data.consume_integer(); @@ -703,7 +701,6 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { bt_list_consumer send = data.consume_list_consumer(); zmq::socket_t *send_to; - std::string routing_prefix; if (conn_id.sn()) { auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, keep_alive); if (!sock_route.first) { @@ -715,19 +712,18 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { return; } send_to = sock_route.first; - routing_prefix = std::move(sock_route.second); - } else if (!conn_route.empty()) { // incoming non-SN connection + conn_id.route = std::move(sock_route.second); + } else if (!conn_id.route.empty()) { // incoming non-SN connection auto it = incoming_conn_index.find(conn_id); if (it == incoming_conn_index.end()) { LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found"); return; } send_to = &connections[it->second]; - routing_prefix = std::string{conn_route}; } else { auto pr = peers.equal_range(conn_id); if (pr.first == peers.end()) { - LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid connection"); + LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection"); return; } auto& peer = pr.first->second; @@ -741,9 +737,9 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { } try { - send_message_parts(*send_to, build_send_parts(send, routing_prefix)); + send_message_parts(*send_to, build_send_parts(send, conn_id.route)); } catch (const zmq::error_t &e) { - if (e.num() == EHOSTUNREACH && !routing_prefix.empty() /*= incoming conn*/) { + if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) { LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details"); // Our incoming connection no longer exists; remove it from `peers`. auto pr = peers.equal_range(conn_id); @@ -754,7 +750,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) { bool removed; for (auto it = pr.first; it != pr.second; ) { auto& peer = it->second; - if (peer.route == routing_prefix) { + if (peer.route == conn_id.route) { peers.erase(it); removed = true; break; @@ -782,13 +778,13 @@ void LokiMQ::proxy_reply(bt_dict_consumer data) { throw std::runtime_error("Invalid error: invalid conn_id value (-1)"); have_conn_id = true; } - if (data.skip_until("pubkey")) { + if (data.skip_until("conn_pubkey")) { if (have_conn_id) - throw std::runtime_error("Internal error: Invalid proxy send command; conn_id and pubkey are exclusive"); + throw std::runtime_error("Internal error: Invalid proxy reply command; conn_id and conn_pubkey are exclusive"); conn_id.pk = data.consume_string(); conn_id.id = ConnectionID::SN_ID; } else if (!have_conn_id) - throw std::runtime_error("Internal error: Invalid proxy send command; pubkey or conn_id missing"); + throw std::runtime_error("Internal error: Invalid proxy reply command; conn_pubkey or conn_id missing"); if (!data.skip_until("send")) throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing"); @@ -1416,14 +1412,13 @@ void LokiMQ::set_general_threads(int threads) { general_workers = threads; } -LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, std::string route_, +LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, ConnectionID conn_, std::vector data_parts_, const std::pair* callback_) { is_batch_job = false; is_reply_job = false; cat = cat_; command = std::move(command_); conn = std::move(conn_); - conn_route = std::move(route_); data_parts = std::move(data_parts_); callback = callback_; return *this; @@ -1431,7 +1426,7 @@ LokiMQ::run_info& LokiMQ::run_info::load(category* cat_, std::string command_, C LokiMQ::run_info& LokiMQ::run_info::load(pending_command&& pending) { return load(&pending.cat, std::move(pending.command), std::move(pending.conn), - std::move(pending.conn_route), std::move(pending.data_parts), pending.callback); + std::move(pending.data_parts), pending.callback); } LokiMQ::run_info& LokiMQ::run_info::load(batch_job&& bj, bool reply_job) { @@ -1555,8 +1550,8 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par } LMQ_LOG(debug, "No available free workers, queuing ", command, " for later"); - ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey}; - pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::move(conn), tmp_peer.route); + ConnectionID conn{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey, std::move(tmp_peer.route)}; + pending_commands.emplace_back(category, std::move(command), std::move(data_parts), cat_call.second, std::move(conn)); category.queued++; return; } @@ -1569,10 +1564,10 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par auto& run = get_idle_worker(); { ConnectionID c{peer->service_node ? ConnectionID::SN_ID : conn_index_to_id[conn_index].id, peer->pubkey}; + c.route = std::move(tmp_peer.route); if (outgoing || peer->service_node) tmp_peer.route.clear(); - run.load(&category, std::move(command), std::move(c), std::move(tmp_peer.route), - std::move(data_parts), cat_call.second); + run.load(&category, std::move(command), std::move(c), std::move(data_parts), cat_call.second); } if (outgoing) diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 116fbc5..7a4e29f 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -102,8 +102,14 @@ struct ConnectionID { ConnectionID& operator=(const ConnectionID&) = default; ConnectionID& operator=(ConnectionID&&) = default; + // Returns true if this is a ConnectionID (false for a default-constructed, invalid id) + explicit operator bool() const { + return id != 0; + } + // Two ConnectionIDs are equal if they are both SNs and have matching pubkeys, or they are both - // not SNs and have matching internal IDs. (Pubkeys do not have to match for non-SNs). + // not SNs and have matching internal IDs. (Pubkeys do not have to match for non-SNs, and + // routes are not considered for equality at all). bool operator==(const ConnectionID &o) const { if (id == SN_ID && o.id == SN_ID) return pk == o.pk; @@ -127,14 +133,17 @@ struct ConnectionID { ConnectionID() : ConnectionID(0) {} private: ConnectionID(long long id) : id{id} {} - ConnectionID(long long id, std::string pubkey) : id{id}, pk{std::move(pubkey)} {} + ConnectionID(long long id, std::string pubkey, std::string route = "") + : id{id}, pk{std::move(pubkey)}, route{std::move(route)} {} constexpr static long long SN_ID = -1; long long id = 0; std::string pk; + std::string route; friend class LokiMQ; friend class std::hash; - + template + friend bt_dict build_send(ConnectionID to, string_view cmd, const T&... opts); friend std::ostream& operator<<(std::ostream& o, const ConnectionID& conn); }; @@ -158,7 +167,6 @@ public: LokiMQ& lokimq; ///< The owning LokiMQ object std::vector data; ///< The provided command data parts, if any. ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status. - std::string route; ///< The return route for a reply (if the message was on an incoming conn) std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`. /// Constructor @@ -650,12 +658,11 @@ private: std::vector data_parts; const std::pair* callback; ConnectionID conn; - std::string conn_route; pending_command(category& cat, std::string command, std::vector data_parts, - const std::pair* callback, ConnectionID conn, std::string conn_route) + const std::pair* callback, ConnectionID conn) : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, - callback{callback}, conn{std::move(conn)}, conn_route{std::move(conn_route)} {} + callback{callback}, conn{std::move(conn)} {} }; std::list pending_commands; @@ -692,7 +699,7 @@ private: std::string worker_routing_id; // "w123" where 123 == worker_id /// Loads the run info with an incoming command - run_info& load(category* cat, std::string command, ConnectionID conn, std::string route, + run_info& load(category* cat, std::string command, ConnectionID conn, std::vector data_parts, const std::pair* callback); /// Loads the run info with a stored pending command @@ -1115,14 +1122,6 @@ struct keep_alive { explicit keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {} }; -/// Specifies a routing prefix to be used. This option is required (and added automatically by -/// Message::send and ::reply) when a message is being sent to a non-SN connection on a listening -/// socket, and has no effect otherwise. -struct route { - std::string routing_prefix; - explicit route(std::string r) : routing_prefix{std::move(r)} {} -}; - } namespace detail { @@ -1163,18 +1162,12 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option control_data["keep-alive"] = timeout.time.count(); } -/// `route` specialization: adds a routing prefix to be used when sending a non-SN message on an -/// incoming socket. -inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::route& route) { - control_data["conn_route"] = route.routing_prefix; -} - } // namespace detail template -void LokiMQ::send(ConnectionID to, string_view cmd, const T &...opts) { +bt_dict build_send(ConnectionID to, string_view cmd, const T&... opts) { bt_dict control_data; - bt_list parts{{std::move(cmd)}}; + bt_list parts{{cmd}}; #ifdef __cpp_fold_expressions (detail::apply_send_option(parts, control_data, opts),...); #else @@ -1182,56 +1175,52 @@ void LokiMQ::send(ConnectionID to, string_view cmd, const T &...opts) { #endif if (to.sn()) - control_data["pubkey"] = std::move(to.pk); - else + control_data["conn_pubkey"] = std::move(to.pk); + else { control_data["conn_id"] = to.id; + control_data["conn_route"] = std::move(to.route); + } control_data["send"] = std::move(parts); - detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data)); + return control_data; + +} + +template +void LokiMQ::send(ConnectionID to, string_view cmd, const T&... opts) { + detail::send_control(get_control_socket(), "SEND", + bt_serialize(build_send(std::move(to), cmd, opts...))); } std::string make_random_string(size_t size); template void LokiMQ::request(ConnectionID to, string_view cmd, ReplyCallback callback, const T &...opts) { - auto reply_tag = make_random_string(15); // 15 random bytes is lots and should keep us in most stl implementations' small string optimization - bt_dict control_data; - bt_list parts{{std::move(cmd), reply_tag}}; -#ifdef __cpp_fold_expressions - (detail::apply_send_option(parts, control_data, opts),...); -#else - (void) std::initializer_list{(detail::apply_send_option(parts, control_data, opts), 0)...}; -#endif - - if (to.sn()) - control_data["pubkey"] = std::move(to.pk); - else - control_data["conn_id"] = to.id; - - control_data["send"] = std::move(parts); + const auto reply_tag = make_random_string(15); // 15 random bytes is lots and should keep us in most stl implementations' small string optimization + bt_dict control_data = build_send(std::move(to), cmd, reply_tag, opts...); control_data["request"] = true; control_data["request_callback"] = reinterpret_cast(new ReplyCallback{std::move(callback)}); - control_data["request_tag"] = std::move(reply_tag); - detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data)); + control_data["request_tag"] = string_view{reply_tag}; + detail::send_control(get_control_socket(), "SEND", bt_serialize(std::move(control_data))); } template void Message::send_back(string_view command, Args&&... args) { if (conn.sn()) lokimq.send(conn, command, std::forward(args)...); - else lokimq.send(conn, command, send_option::route{route}, send_option::optional{}, std::forward(args)...); + else lokimq.send(conn, command, send_option::optional{}, std::forward(args)...); } template void Message::send_reply(Args&&... args) { assert(!reply_tag.empty()); if (conn.sn()) lokimq.send(conn, "REPLY", reply_tag, std::forward(args)...); - else lokimq.send(conn, "REPLY", reply_tag, send_option::route{route}, send_option::optional{}, std::forward(args)...); + else lokimq.send(conn, "REPLY", reply_tag, send_option::optional{}, std::forward(args)...); } template void Message::send_request(string_view cmd, ReplyCallback&& callback, Args&&... args) { if (conn.sn()) lokimq.request(conn, cmd, std::forward(callback), std::forward(args)...); else lokimq.request(conn, cmd, std::forward(callback), - send_option::route{route}, send_option::optional{}, std::forward(args)...); + send_option::optional{}, std::forward(args)...); } template