diff --git a/lokimq/lokimq.cpp b/lokimq/lokimq.cpp index 6bd58c7..c797085 100644 --- a/lokimq/lokimq.cpp +++ b/lokimq/lokimq.cpp @@ -1,6 +1,8 @@ #include #include #include +#include + extern "C" { #include } @@ -102,6 +104,7 @@ void send_routed_message(zmq::socket_t &socket, std::string route, std::string m send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end()); } +// Sends some stuff to a socket directly. void send_direct_message(zmq::socket_t &socket, std::string msg, std::string data = {}) { std::array msgs{{create_message(std::move(msg))}}; if (!data.empty()) @@ -142,18 +145,23 @@ std::string zmtp_metadata(string_view key, string_view value) { return result; } -void check_not_started(const std::thread& proxy_thread) { +void check_started(const std::thread& proxy_thread, const std::string &verb) { + if (!proxy_thread.joinable()) + throw std::logic_error("Cannot " + verb + " before calling `start()`"); +} + +void check_not_started(const std::thread& proxy_thread, const std::string &verb) { if (proxy_thread.joinable()) - throw std::logic_error("Cannot add categories/commands/aliases after calling `start()`"); + throw std::logic_error("Cannot " + verb + " after calling `start()`"); } // Extracts and builds the "send" part of a message for proxy_send/proxy_reply -std::list build_send_parts(bt_dict &data, const std::string &route) { +std::list build_send_parts(bt_list_consumer send, string_view route) { std::list parts; if (!route.empty()) parts.push_back(create_message(route)); - for (auto &s : data.at("send").get()) - parts.push_back(create_message(std::move(s.get()))); + while (!send.is_finished()) + parts.push_back(create_message(send.consume_string())); return parts; } @@ -229,7 +237,7 @@ LogLevel LokiMQ::log_level() const { void LokiMQ::add_category(std::string name, Access access_level, unsigned int reserved_threads, int max_queue) { - check_not_started(proxy_thread); + check_not_started(proxy_thread, "add a category"); if (name.size() > MAX_CATEGORY_LENGTH) throw std::runtime_error("Invalid category name `" + name + "': name too long (> " + std::to_string(MAX_CATEGORY_LENGTH) + ")"); @@ -245,7 +253,7 @@ void LokiMQ::add_category(std::string name, Access access_level, unsigned int re } void LokiMQ::add_command(const std::string& category, std::string name, CommandCallback callback) { - check_not_started(proxy_thread); + check_not_started(proxy_thread, "add a command"); if (name.size() > MAX_COMMAND_LENGTH) throw std::runtime_error("Invalid command name `" + name + "': name too long (> " + std::to_string(MAX_COMMAND_LENGTH) + ")"); @@ -258,13 +266,18 @@ void LokiMQ::add_command(const std::string& category, std::string name, CommandC if (command_aliases.count(fullname)) throw std::runtime_error("Cannot add command `" + fullname + "': a command alias with that name is already defined"); - auto ins = catit->second.commands.emplace(std::move(name), std::move(callback)); + auto ins = catit->second.commands.insert({std::move(name), {std::move(callback), false}}); if (!ins.second) throw std::runtime_error("Cannot add command `" + fullname + "': that command already exists"); } +void LokiMQ::add_request_command(const std::string& category, std::string name, CommandCallback callback) { + add_command(category, name, std::move(callback)); + categories.at(category).commands.at(name).second = true; +} + void LokiMQ::add_command_alias(std::string from, std::string to) { - check_not_started(proxy_thread); + check_not_started(proxy_thread, "add a command alias"); if (from.empty()) throw std::runtime_error("Cannot add an alias for empty command"); @@ -299,6 +312,8 @@ std::mutex control_sockets_mutex; /// commands in a thread-safe manner. A mutex is only required here the first time a thread /// accesses the control socket. zmq::socket_t& LokiMQ::get_control_socket() { + assert(proxy_thread.joinable()); + // Maps the LokiMQ unique ID to a local thread command socket. static thread_local std::map> control_sockets; static thread_local std::pair> last{-1, nullptr}; @@ -338,7 +353,7 @@ LokiMQ::LokiMQ( AllowFunc allow, Logger logger) : object_id{next_id++}, pubkey{std::move(pubkey_)}, privkey{std::move(privkey_)}, local_service_node{service_node}, - bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{logger}, + bind{std::move(bind_)}, peer_lookup{std::move(lookup)}, allow_connection{std::move(allow)}, logger{std::move(logger)}, poll_remote_offset{poll_internal_size + (bind.empty() ? 0 : 1)} { LMQ_LOG(trace, "Constructing listening LokiMQ, id=", object_id, ", this=", this); @@ -404,11 +419,7 @@ void LokiMQ::start() { void LokiMQ::worker_thread(unsigned int index) { std::string worker_id = "w" + std::to_string(index); zmq::socket_t sock{context, zmq::socket_type::dealer}; -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION (4, 3, 0) sock.setsockopt(ZMQ_ROUTING_ID, worker_id.data(), worker_id.size()); -#else - sock.setsockopt(ZMQ_IDENTITY, worker_id.data(), worker_id.size()); -#endif LMQ_LOG(debug, "New worker thread ", worker_id, " started"); sock.connect(SN_ADDR_WORKERS); @@ -427,14 +438,21 @@ void LokiMQ::worker_thread(unsigned int index) { run.batch->job_completion(); } } else { - message.pubkey = {run.pubkey.data(), 32}; + message.pubkey = run.pubkey; message.service_node = run.service_node; message.data.clear(); - for (auto& m : run.data_parts) - message.data.emplace_back(m.data(), m.size()); + + if (run.callback->second /*is_request*/) { + message.reply_tag = {run.data_parts[0].data(), run.data_parts[0].size()}; + for (auto it = run.data_parts.begin() + 1; it != run.data_parts.end(); ++it) + message.data.emplace_back(it->data(), it->size()); + } else { + for (auto& m : run.data_parts) + message.data.emplace_back(m.data(), m.size()); + } LMQ_LOG(trace, "worker thread ", worker_id, " invoking ", run.command, " callback with ", message.data.size(), " message parts"); - (*run.callback)(message); + run.callback->first(message); } /* @@ -546,8 +564,19 @@ void LokiMQ::proxy_quit() { LMQ_LOG(debug, "Proxy thread teardown complete"); } +void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey) { + // FIXME: not passing a remote_pubkey is a problem + if (!remote_pubkey.empty()) + socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote_pubkey.data(), remote_pubkey.size()); + socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); + socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); + socket.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); + socket.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); + socket.setsockopt(ZMQ_ROUTING_ID, pubkey.data(), pubkey.size()); +} + std::pair -LokiMQ::proxy_connect(const std::string &remote, const std::string &connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) { +LokiMQ::proxy_connect_sn(const std::string &remote, const std::string &connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) { auto &peer = peers[remote]; // We may auto-vivify here, but that's okay; it'll get cleaned up in idle_expiry if no connection gets established std::pair result = {nullptr, ""s}; @@ -598,16 +627,7 @@ LokiMQ::proxy_connect(const std::string &remote, const std::string &connect_hint LMQ_LOG(debug, to_hex(pubkey), " connecting to ", addr, " to reach ", to_hex(remote)); zmq::socket_t socket{context, zmq::socket_type::dealer}; - socket.setsockopt(ZMQ_CURVE_SERVERKEY, remote.data(), remote.size()); - socket.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); - socket.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); - socket.setsockopt(ZMQ_HANDSHAKE_IVL, SN_HANDSHAKE_TIME); - socket.setsockopt(ZMQ_MAXMSGSIZE, SN_ZMQ_MAX_MSG_SIZE); -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION (4, 3, 0) - socket.setsockopt(ZMQ_ROUTING_ID, pubkey.data(), pubkey.size()); -#else - socket.setsockopt(ZMQ_IDENTITY, pubkey.data(), pubkey.size()); -#endif + setup_outgoing_socket(socket, remote); socket.connect(addr); peer.idle_expiry = keep_alive; @@ -621,7 +641,7 @@ LokiMQ::proxy_connect(const std::string &remote, const std::string &connect_hint return result; } -std::pair LokiMQ::proxy_connect(bt_dict &&data) { +std::pair LokiMQ::proxy_connect_sn(bt_dict &&data) { auto remote_pubkey = data.at("pubkey").get(); std::chrono::milliseconds keep_alive{get_int(data.at("keep-alive"))}; std::string hint; @@ -631,24 +651,44 @@ std::pair LokiMQ::proxy_connect(bt_dict &&data) { bool optional = data.count("optional"), incoming = data.count("incoming"); - return proxy_connect(remote_pubkey, hint, optional, incoming, keep_alive); + return proxy_connect_sn(remote_pubkey, hint, optional, incoming, keep_alive); } -void LokiMQ::proxy_send(bt_dict &&data) { - const auto &remote_pubkey = data.at("pubkey").get(); +void LokiMQ::proxy_send(bt_dict_consumer data) { + // NB: bt_dict_consumer goes in alphabetical order std::string hint; - auto hint_it = data.find("hint"); - if (hint_it != data.end()) - hint = hint_it->second.get(); + std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE}; + bool optional = false; + bool incoming = false; + bool request = false; + std::string request_tag; + std::unique_ptr request_cbptr; + if (data.skip_until("hint")) + hint = data.consume_string(); + if (data.skip_until("incoming")) + incoming = data.consume_integer(); + if (data.skip_until("keep-alive")) + keep_alive = std::chrono::milliseconds{data.consume_integer()}; + if (data.skip_until("optional")) + optional = data.consume_integer(); + if (!data.skip_until("pubkey")) + throw std::runtime_error("Internal error: Invalid proxy send command; pubkey missing"); + std::string remote_pubkey = data.consume_string(); + if (data.skip_until("request")) + request = data.consume_integer(); + if (request) { + if (!data.skip_until("request_callback")) + throw std::runtime_error("Internal error: received request without request_callback"); + request_cbptr.reset(reinterpret_cast(data.consume_integer())); + if (!data.skip_until("request_tag")) + throw std::runtime_error("Internal error: received request without request_name"); + request_tag = data.consume_string(); + } + if (!data.skip_until("send")) + throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing"); + bt_list_consumer send = data.consume_list_consumer(); - auto idle_it = data.find("keep-alive"); - std::chrono::milliseconds keep_alive = idle_it != data.end() - ? std::chrono::milliseconds{get_int(idle_it->second)} - : DEFAULT_SEND_KEEP_ALIVE; - - bool optional = data.count("optional"), incoming = data.count("incoming"); - - auto sock_route = proxy_connect(remote_pubkey, hint, optional, incoming, keep_alive); + auto sock_route = proxy_connect_sn(remote_pubkey, hint, optional, incoming, keep_alive); if (!sock_route.first) { if (optional) LMQ_LOG(debug, "Not sending: send is optional and no connection to ", to_hex(remote_pubkey), " is currently established"); @@ -656,8 +696,14 @@ void LokiMQ::proxy_send(bt_dict &&data) { LMQ_LOG(error, "Unable to send to ", to_hex(remote_pubkey), ": no connection could be established"); return; } + + if (request) { + pending_requests.insert({ request_tag, { + std::chrono::steady_clock::now() + REQUEST_TIMEOUT, std::move(*request_cbptr) }}); + } + try { - send_message_parts(*sock_route.first, build_send_parts(data, sock_route.second)); + send_message_parts(*sock_route.first, build_send_parts(send, sock_route.second)); } catch (const zmq::error_t &e) { if (e.num() == EHOSTUNREACH && sock_route.first == &listener && !sock_route.second.empty()) { // We *tried* to route via the incoming connection but it is no longer valid. Drop it, @@ -671,16 +717,25 @@ void LokiMQ::proxy_send(bt_dict &&data) { } } -void LokiMQ::proxy_reply(bt_dict &&data) { - const auto &route = data.at("route").get(); +void LokiMQ::proxy_reply(bt_dict_consumer data) { + // NB: bt_dict_consumer goes in alphabetical order + data.skip_until("route"); + string_view route = data.consume_string(); assert(!route.empty()); + if (!listener.connected()) { + // FIXME: this is wrong; we can reply to something even with no listener (e.g. if client + // says A, server replies B, client replies to that with C). LMQ_LOG(error, "Internal error: proxy_reply called but that shouldn't be possible as we have no listener!"); return; } + if (!data.skip_until("send")) + throw std::runtime_error("Internal error: Invalid proxy reply command; send parts missing"); + bt_list_consumer send = data.consume_list_consumer(); + try { - send_message_parts(listener, build_send_parts(data, route)); + send_message_parts(listener, build_send_parts(send, route)); } catch (const zmq::error_t &err) { if (err.num() == EHOSTUNREACH) { LMQ_LOG(info, "Unable to send reply to incoming non-SN request: remote is no longer connected"); @@ -690,8 +745,8 @@ void LokiMQ::proxy_reply(bt_dict &&data) { } } -void LokiMQ::proxy_batch(detail::Batch* batchptr) { - auto& batch = *batches.emplace(batchptr).first; +void LokiMQ::proxy_batch(detail::Batch* batch) { + batches.insert(batch); const int jobs = batch->size(); for (int i = 0; i < jobs; i++) batch_jobs.emplace(batch, i); @@ -735,18 +790,22 @@ void LokiMQ::proxy_control_message(std::vector& parts) { auto route = view(parts[0]), cmd = view(parts[1]); LMQ_LOG(trace, "control message: ", cmd); if (parts.size() == 3) { + LMQ_LOG(trace, "...: ", parts[2]); if (cmd == "SEND") { LMQ_LOG(trace, "proxying message"); - return proxy_send(bt_deserialize(view(parts[2]))); + return proxy_send(view(parts[2])); } else if (cmd == "REPLY") { LMQ_LOG(trace, "proxying reply to non-SN incoming message"); - return proxy_reply(bt_deserialize(view(parts[2]))); + return proxy_reply(view(parts[2])); } else if (cmd == "BATCH") { LMQ_LOG(trace, "proxy batch jobs"); auto ptrval = bt_deserialize(view(parts[2])); return proxy_batch(reinterpret_cast(ptrval)); - } else if (cmd == "CONNECT") { - proxy_connect(bt_deserialize(view(parts[2]))); + } else if (cmd == "CONNECT_SN") { + proxy_connect_sn(bt_deserialize(view(parts[2]))); + return; + } else if (cmd == "CONNECT_REMOTE") { + proxy_connect_remote(view(parts[2])); return; } else if (cmd == "TIMER") { return proxy_timer(view(parts[2])); @@ -771,20 +830,30 @@ void LokiMQ::proxy_control_message(std::vector& parts) { " (" + std::to_string(parts.size()) + ")"); } + +void LokiMQ::proxy_close_remote(int index, bool linger) { + remotes[index].second.setsockopt(ZMQ_LINGER, linger ? std::chrono::milliseconds{CLOSE_LINGER}.count() : 0); + pollitems.erase(pollitems.begin() + poll_remote_offset + index); + remotes.erase(remotes.begin() + index); + assert(remotes.size() == pollitems.size() + poll_remote_offset); + + for (auto& p : peers) + if (p.second.outgoing > index) + --p.second.outgoing; + + for (auto& pc : pending_connects) { + auto& i = std::get(pc); + if (i > index) + --i; + } +} + auto LokiMQ::proxy_close_outgoing(decltype(peers)::iterator it) -> decltype(it) { auto &peer = *it; auto &info = peer.second; if (info.outgoing >= 0) { - remotes[info.outgoing].second.setsockopt(ZMQ_LINGER, std::chrono::milliseconds{CLOSE_LINGER}.count()); - pollitems.erase(pollitems.begin() + poll_remote_offset + info.outgoing); - remotes.erase(remotes.begin() + info.outgoing); - assert(remotes.size() == pollitems.size() + poll_remote_offset); - - for (auto &p : peers) - if (p.second.outgoing > info.outgoing) - --p.second.outgoing; - + proxy_close_remote(info.outgoing); info.outgoing = -1; } @@ -822,6 +891,42 @@ void LokiMQ::proxy_expire_idle_peers() { } } +void LokiMQ::proxy_conn_cleanup() { + // Drop idle connections (if we haven't done it in a while) but *only* if we have some idle + // general workers: if we don't have any idle workers then we may still have incoming messages which + // we haven't processed yet and those messages might end up resetting the last activity time. + if (workers.size() < general_workers) { + LMQ_LOG(trace, "closing idle connections"); + proxy_expire_idle_peers(); + } + + auto now = std::chrono::steady_clock::now(); + + // Check any pending outgoing connections for timeout + for (auto it = pending_connects.begin(); it != pending_connects.end(); ) { + auto& pc = *it; + if (std::get<1>(pc) < now) { + job([callback = std::move(std::get<3>(pc))] { callback("connection attempt timed out"); }); + int index = std::get<0>(pc); + it = pending_connects.erase(it); + proxy_close_remote(index, false /*linger*/); + } else { + ++it; + } + } + + // Remove any expired pending requests and schedule their callback with a failure + for (auto it = pending_requests.begin(); it != pending_requests.end(); ) { + auto& callback = it->second; + if (callback.first < now) { + job([callback = std::move(callback.second)] { callback(false, {}); }); + it = pending_requests.erase(it); + } else { + ++it; + } + } +}; + void LokiMQ::proxy_loop() { zmq::socket_t zap_auth{context, zmq::socket_type::rep}; zap_auth.setsockopt(ZMQ_LINGER, 0); @@ -839,9 +944,10 @@ void LokiMQ::proxy_loop() { } if (log_level() >= LogLevel::trace) { - LMQ_LOG(trace, "Reserving space for ", max_workers, " max workers = ", general_workers, " general + category reserved:"); + LMQ_LOG(trace, "Reserving space for ", max_workers, " max workers = ", general_workers, " general + reserved:"); for (const auto& cat : categories) LMQ_LOG(trace, " - ", cat.first, ": ", cat.second.reserved_threads); + LMQ_LOG(trace, " - (batch jobs): ", batch_jobs_reserved); } workers.reserve(max_workers); @@ -860,7 +966,8 @@ void LokiMQ::proxy_loop() { listener.setsockopt(ZMQ_CURVE_SERVER, 1); listener.setsockopt(ZMQ_CURVE_PUBLICKEY, pubkey.data(), pubkey.size()); listener.setsockopt(ZMQ_CURVE_SECRETKEY, privkey.data(), privkey.size()); - listener.setsockopt(ZMQ_MAXMSGSIZE, SN_ZMQ_MAX_MSG_SIZE); + listener.setsockopt(ZMQ_HANDSHAKE_IVL, (int) HANDSHAKE_TIME.count()); + listener.setsockopt(ZMQ_MAXMSGSIZE, MAX_MSG_SIZE); listener.setsockopt(ZMQ_ROUTER_HANDOVER, 1); listener.setsockopt(ZMQ_ROUTER_MANDATORY, 1); @@ -879,11 +986,19 @@ void LokiMQ::proxy_loop() { assert(pollitems.size() == poll_remote_offset); - constexpr auto timeout_check_interval = 10000ms; // Minimum time before for checking for connections to close since the last check - auto last_conn_timeout = std::chrono::steady_clock::now(); if (!timers) timers.reset(zmq_timers_new()); + auto do_conn_cleanup = [this] { proxy_conn_cleanup(); }; + using CleanupLambda = decltype(do_conn_cleanup); + if (-1 == zmq_timers_add(timers.get(), + std::chrono::milliseconds{CONN_CHECK_INTERVAL}.count(), + // Wrap our lambda into a C function pointer where we pass in the lambda pointer as extra arg + [](int /*timer_id*/, void* cleanup) { (*static_cast(cleanup))(); }, + &do_conn_cleanup)) { + throw zmq::error_t{}; + } + std::vector parts; while (true) { @@ -932,9 +1047,9 @@ void LokiMQ::proxy_loop() { // We round-robin connections when pulling off pending messages one-by-one rather than // pulling off all messages from one connection before moving to the next; thus in cases of // contention we end up fairly distributing. - const size_t num_sockets = remotes.size() + listener.connected(); - std::queue queue_index; - for (size_t i = 0; i < num_sockets; i++) + const int num_sockets = remotes.size() + listener.connected(); + std::queue queue_index; + for (int i = 0; i < num_sockets; i++) queue_index.push(i); for (parts.clear(); !queue_index.empty() && workers.size() < max_workers; parts.clear()) { @@ -958,23 +1073,11 @@ void LokiMQ::proxy_loop() { proxy_to_worker(i, parts); } - // Drop idle connections (if we haven't done it in a while) but *only* if we have some idle - // general workers: if we don't have any idle workers then we may still have incoming messages which - // we haven't processed yet and those messages might end up reset the last activity time. - if (workers.size() < general_workers) { - auto now = std::chrono::steady_clock::now(); - if (now - last_conn_timeout >= timeout_check_interval) { - LMQ_LOG(trace, "closing idle connections"); - proxy_expire_idle_peers(); - last_conn_timeout = now; - } - } - LMQ_LOG(trace, "done proxy loop"); } } -std::pair LokiMQ::get_command(std::string& command) { +std::pair*> LokiMQ::get_command(std::string& command) { if (command.size() > MAX_CATEGORY_LENGTH + 1 + MAX_COMMAND_LENGTH) { LMQ_LOG(warn, "Invalid command '", command, "': command too long"); return {}; @@ -1082,14 +1185,20 @@ void LokiMQ::proxy_worker_message(std::vector& parts) { } } -decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(zmq::message_t& msg) { +decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(int conn_index, zmq::message_t& msg) { + bool is_outgoing_conn = !listener.connected() || conn_index > 0; + std::string pubkey; - bool service_node; - try { - extract_pubkey(msg, pubkey, service_node); - } catch (...) { - LMQ_LOG(error, "Internal error: message metadata not set or invalid; dropping message"); - throw std::out_of_range("message pubkey metadata invalid"); + bool service_node = false; + if (!is_outgoing_conn) { + try { + extract_pubkey(msg, pubkey, service_node); + } catch (...) { + LMQ_LOG(error, "Internal error: message metadata not set or invalid; dropping message"); + throw std::out_of_range("message pubkey metadata invalid"); + } + } else { + pubkey = remotes[conn_index].first; } auto it = peers.find(pubkey); @@ -1099,11 +1208,77 @@ decltype(LokiMQ::peers)::iterator LokiMQ::proxy_lookup_peer(zmq::message_t& msg) return it; } -bool LokiMQ::proxy_handle_builtin(size_t conn_index, std::vector& parts) { - (void) conn_index; // FIXME - auto cmd = view(parts.front()); - if (cmd == "BYE") { - auto pit = proxy_lookup_peer(parts.front()); +// Return true if we recognized/handled the builtin command (even if we reject it for whatever +// reason) +bool LokiMQ::proxy_handle_builtin(int conn_index, std::vector& parts) { + string_view route, cmd; + bool is_outgoing_conn = !listener.connected() || conn_index > 0; + if (parts.size() < (is_outgoing_conn ? 1 : 2)) { + LMQ_LOG(warn, "Received empty message; ignoring"); + return true; + } + if (is_outgoing_conn) { + cmd = view(parts[0]); + } else { + route = view(parts[0]); + cmd = view(parts[1]); + } + LMQ_LOG(trace, "Checking for builtins: ", cmd, " from ", peer_address(parts.back())); + + if (cmd == "REPLY") { + size_t tag_pos = (is_outgoing_conn ? 1 : 2); + if (parts.size() <= tag_pos) { + LMQ_LOG(warn, "Received REPLY without a reply tag; ignoring"); + return true; + } + std::string reply_tag = view(parts[1]); + auto it = pending_requests.find(reply_tag); + if (it != pending_requests.end()) { + LMQ_LOG(debug, "Received REPLY for pending command; scheduling callback"); + std::vector data; + data.reserve(parts.size() - (tag_pos + 1)); + for (auto it = parts.begin() + (tag_pos + 1); it != parts.end(); ++it) + data.emplace_back(view(*it)); + proxy_schedule_job([callback=std::move(it->second.second), data=std::move(data)] { + callback(true, std::move(data)); + }); + } else { + LMQ_LOG(warn, "Received REPLY with unknown or already handled reply tag (", to_hex(reply_tag), "); ignoring"); + } + return true; + } else if (cmd == "HI") { + if (is_outgoing_conn) { + LMQ_LOG(warn, "Got invalid 'HI' message on an outgoing connection; ignoring"); + return true; + } + LMQ_LOG(info, "Incoming client from ", peer_address(parts.back()), " send HI, replying with HELLO"); + send_routed_message(listener, route, "HELLO"); + return true; + } else if (cmd == "HELLO") { + if (!is_outgoing_conn) { + LMQ_LOG(warn, "Got invalid 'HELLO' message on an incoming connection; ignoring"); + return true; + } + auto it = std::find_if(pending_connects.begin(), pending_connects.end(), + [&](auto& pc) { return std::get<0>(pc) == conn_index; }); + if (it == pending_connects.end()) { + LMQ_LOG(warn, "Got invalid 'HELLO' message on an already handshaked incoming connection; ignoring"); + return true; + } + LMQ_LOG(info, "Got initial HELLO server response from ", peer_address(parts.back())); + size_t pksize = 32; + remotes[conn_index].first.resize(pksize); + remotes[conn_index].second.getsockopt(ZMQ_CURVE_SERVERKEY, &remotes[conn_index].first[0], &pksize); + auto &peer = peers[remotes[conn_index].first]; + peer.idle_expiry = 365 * 24h; + peer.outgoing = conn_index; + peer.service_node = false; + peer.activity(); + proxy_schedule_job([on_success=std::move(std::get<2>(*it)), pk=remotes[conn_index].first] { on_success(std::move(pk)); }); + pending_connects.erase(it); + return true; + } else if (cmd == "BYE") { + auto pit = proxy_lookup_peer(conn_index, parts.front()); proxy_close_outgoing(pit); return true; } @@ -1191,7 +1366,7 @@ void LokiMQ::proxy_process_queue() { } void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& parts) { - auto pit = proxy_lookup_peer(parts.back()); + auto pit = proxy_lookup_peer(conn_index, parts.back()); string_view pubkey = pit->first; auto& peer_info = pit->second; @@ -1234,6 +1409,11 @@ void LokiMQ::proxy_to_worker(size_t conn_index, std::vector& par return; } + if (cat_call.second->second /*is_request*/ && data_parts.empty()) { + LMQ_LOG(warn, "Received an invalid request command with no reply tag; dropping message"); + return; + } + auto& run = get_idle_worker(); run.is_batch_job = false; run.cat = &category; @@ -1411,11 +1591,81 @@ LokiMQ::~LokiMQ() { LMQ_LOG(info, "LokiMQ proxy thread has stopped"); } -void LokiMQ::connect(const std::string &pubkey, std::chrono::milliseconds keep_alive, const std::string &hint) { - detail::send_control(get_control_socket(), "CONNECT", bt_serialize({{"pubkey",pubkey}, {"keep-alive",keep_alive.count()}, {"hint",hint}})); +void LokiMQ::connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive, string_view hint) { + check_started(proxy_thread, "connect"); + + detail::send_control(get_control_socket(), "CONNECT_SN", bt_serialize({{"pubkey",pubkey}, {"keep-alive",keep_alive.count()}, {"hint",hint}})); } -inline void LokiMQ::job(std::function f) { +void LokiMQ::connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure, + string_view pubkey, std::chrono::milliseconds timeout) { + if (!proxy_thread.joinable()) + LMQ_LOG(warn, "connect_remote() called before start(); this won't take effect until start() is called"); + + LMQ_LOG(trace, "telling proxy to connect to ", remote, ", expecting pubkey [", to_hex(pubkey), "]"); + detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize({ + {"remote", remote}, + {"pubkey", pubkey}, + {"timeout", timeout.count()}, + {"connect", reinterpret_cast(new ConnectSuccess{std::move(on_connect)})}, + {"failure", reinterpret_cast(new ConnectFailure{std::move(on_failure)})}, + })); +} + +void LokiMQ::proxy_connect_remote(bt_dict_consumer data) { + ConnectSuccess on_connect; + ConnectFailure on_failure; + std::string remote; + std::string remote_pubkey; + std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT; + + if (data.skip_until("connect")) { + auto* ptr = reinterpret_cast(data.consume_integer()); + on_connect = std::move(*ptr); + delete ptr; + } + if (data.skip_until("failure")) { + auto* ptr = reinterpret_cast(data.consume_integer()); + on_failure = std::move(*ptr); + delete ptr; + } + if (data.skip_until("pubkey")) { + remote_pubkey = data.consume_string(); + assert(remote_pubkey.size() == 32 || remote_pubkey.empty()); + } + if (data.skip_until("remote")) + remote = data.consume_string(); + if (data.skip_until("timeout")) + timeout = std::chrono::milliseconds{data.consume_integer()}; + + if (remote.empty()) + throw std::runtime_error("Internal error: CONNECT_REMOTE proxy command missing required 'remote' value"); + + LMQ_LOG(info, "Establishing remote connection to ", remote, remote_pubkey.empty() ? " (any pubkey)" : " expecting pubkey " + to_hex(remote_pubkey)); + + zmq::socket_t sock{context, zmq::socket_type::dealer}; + try { + setup_outgoing_socket(sock, remote_pubkey); + sock.connect(remote); + } catch (const zmq::error_t &e) { + proxy_schedule_job([on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] { on_failure(std::move(what)); }); + return; + } + + LMQ_LOG(debug, "Opened new zmq socket to ", remote, ", sending HI"); + send_direct_message(sock, "HI"); + add_pollitem(sock); + remotes.emplace_back("", std::move(sock)); + pending_connects.emplace_back(remotes.size()-1, std::chrono::steady_clock::now() + timeout, + std::move(on_connect), std::move(on_failure)); +} + +void LokiMQ::disconnect_remote(string_view id, std::chrono::milliseconds linger) { + (void)id, (void)linger; +} + + +void LokiMQ::job(std::function f) { auto* b = new Batch; b->add_job(std::move(f)); auto* baseptr = static_cast(b); @@ -1479,6 +1729,15 @@ std::ostream &operator<<(std::ostream &os, LogLevel lvl) { return os; } +std::string make_random_string(size_t size) { + static thread_local std::mt19937_64 rng{std::random_device{}()}; + static thread_local std::uniform_int_distribution dist{std::numeric_limits::min(), std::numeric_limits::max()}; + std::string rando; + rando.reserve(size); + for (size_t i = 0; i < size; i++) + rando += dist(rng); + return rando; } +} // namespace lokimq // vim:sw=4:et diff --git a/lokimq/lokimq.h b/lokimq/lokimq.h index 82bb52d..a21439a 100644 --- a/lokimq/lokimq.h +++ b/lokimq/lokimq.h @@ -91,8 +91,10 @@ class Message { public: LokiMQ& lokimq; ///< The owning LokiMQ object std::vector data; ///< The provided command data parts, if any. - string_view pubkey; ///< The originator pubkey (32 bytes) + string_view id; ///< The remote's unique, opaque id for routing. + string_view pubkey; ///< The remote's pubkey (32 bytes) bool service_node; ///< True if the pubkey is an active SN (note that this is only checked on initial connection, not every received message) + std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`. /// Constructor Message(LokiMQ& lmq) : lokimq{lmq} {} @@ -101,30 +103,45 @@ public: Message(const Message&) = delete; Message& operator=(const Message&) = delete; - /// Sends a reply. Arguments are forwarded to send() but with send_option::optional{} added - /// if the originator is not a SN. For SN messages (i.e. where `sn` is true) this is a - /// "strong" reply by default in that the proxy will attempt to establish a new connection - /// to the SN if no longer connected. For non-SN messages the reply will be attempted using - /// the available routing information, but if the connection has already been closed the - /// reply will be dropped. + /// Sends a command back to whomever sent this message. Arguments are forwarded to send() but + /// with send_option::optional{} added if the originator is not a SN. For SN messages (i.e. + /// where `sn` is true) this is a "strong" reply by default in that the proxy will attempt to + /// establish a new connection to the SN if no longer connected. For non-SN messages the reply + /// will be attempted using the available routing information, but if the connection has already + /// been closed the reply will be dropped. /// /// If you want to send a non-strong reply even when the remote is a service node then add /// an explicit `send_option::optional()` argument. template - void reply(const std::string& command, Args&&... args); + void send_back(const std::string& command, Args&&... args); + + /// Sends a reply to a request. This takes no command: the command is always the built-in + /// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other + /// arguments are as in `send_back()`. + template + void send_reply(Args&&... args); }; // Forward declarations; see batch.h namespace detail { class Batch; } template class Batch; - /** The keep-alive time for a send() that results in a establishing a new outbound connection. To * use a longer keep-alive to a host call `connect()` first with the desired keep-alive time or pass * the send_option::keep_alive. */ static constexpr auto DEFAULT_SEND_KEEP_ALIVE = 30s; +// How frequently we cleanup connections (closing idle connections, calling connect or request failure callbacks) +static constexpr auto CONN_CHECK_INTERVAL = 1s; + +// The default timeout for connect_remote() +static constexpr auto REMOTE_CONNECT_TIMEOUT = 10s; + +// The minimum amount of time we wait for a reply to a REQUEST before calling the callback with +// `false` to signal a timeout. +static constexpr auto REQUEST_TIMEOUT = 15s; + /// Maximum length of a category static constexpr size_t MAX_CATEGORY_LENGTH = 50; @@ -195,12 +212,21 @@ public: /// The callback type for registered commands. using CommandCallback = std::function; + /// The callback for making requests. This is called with `true` and a (moved) vector of data + /// part strings when we get a reply, or `false` and empty vector on timeout. + using ReplyCallback = std::function data)>; + /// Called to write a log message. This will only be called if the `level` is >= the current /// LokiMQ object log level. It must be a raw function pointer (or a capture-less lambda) for /// performance reasons. Takes four arguments: the log level of the message, the filename and /// line number where the log message was invoked, and the log message itself. using Logger = std::function; + /// Callback for the success case of connect_remote() + using ConnectSuccess = std::function; + /// Callback for the failure case of connect_remote() + using ConnectFailure = std::function; + /// Explicitly non-copyable, non-movable because most things here aren't copyable, and a few /// things aren't movable, either. If you need to pass the LokiMQ instance around, wrap it /// in a unique_ptr or shared_ptr. @@ -211,11 +237,11 @@ public: /** How long to wait for handshaking to complete on external connections before timing out and * closing the connection. Setting this only affects new outgoing connections. */ - std::chrono::milliseconds SN_HANDSHAKE_TIME = 10s; + std::chrono::milliseconds HANDSHAKE_TIME = 10s; /** Maximum incoming message size; if a remote tries sending a message larger than this they get * disconnected. -1 means no limit. */ - int64_t SN_ZMQ_MAX_MSG_SIZE = 1 * 1024 * 1024; + int64_t MAX_MSG_SIZE = 1 * 1024 * 1024; /** How long (in ms) to linger sockets when closing them; this is the maximum time zmq spends * trying to sending pending messages before dropping them and closing the underlying socket @@ -254,6 +280,10 @@ private: /// Info about a peer's established connection to us. Note that "established" means both /// connected and authenticated. struct peer_info { + /// Pubkey of the remote; can be empty (especially before handshake) but will only be set if + /// the pubkey has been verified. + std::string pubkey; + /// True if we've authenticated this peer as a service node. bool service_node = false; @@ -281,15 +311,20 @@ private: std::chrono::milliseconds idle_expiry; }; - struct pk_hash { - size_t operator()(const std::string& pubkey) const { - size_t h; - std::memcpy(&h, pubkey.data(), sizeof(h)); - return h; - } - }; - /// Currently peer connections, pubkey -> peer_info - std::unordered_map peers; + /// Currently peer connections: id -> peer_info. id == pubkey for incoming and outgoing SN + /// connections; random string for outgoing direct connections. + std::unordered_map peers; + + /// Remotes we are still trying to connect to (via connect_remote(), not connect_sn()); when + /// we pass handshaking we move them out of here and (if set) trigger the on_connect callback. + /// Unlike regular node-to-node peers, these have an extra "HI"/"HELLO" sequence that we used + /// before we consider ourselves connected to the remote. + std::vector> pending_connects; + + /// Pending requests that have been sent out but not yet received a matching "REPLY". The value + /// is the timeout timestamp. + std::unordered_map> + pending_requests; /// different polling sockets the proxy handler polls: this always contains some internal /// sockets for inter-thread communication followed by listener socket and a pollitem for every @@ -309,6 +344,8 @@ private: /// The outgoing remote connections we currently have open along with the remote pubkeys. Each /// element [i] here corresponds to an the pollitem_t at pollitems[i+1+poll_internal_size]. /// (Ideally we'd use one structure, but zmq requires the pollitems be in contiguous storage). + /// For new connections established via connect_remote the pubkey will be empty until we + /// do the HI/HELLO handshake over the socket. std::vector> remotes; /// Socket we listen on to receive control messages in the proxy thread. Each thread has its own @@ -351,21 +388,25 @@ private: /// Does the proxying work void proxy_loop(); + void proxy_conn_cleanup(); + void proxy_worker_message(std::vector& parts); void proxy_process_queue(); Batch* proxy_schedule_job(std::function f); - /// Looks up a peers element given a zmq message (which has the pubkey and sn status metadata - /// set during initial connection authentication), creating a new peer element if required. - decltype(peers)::iterator proxy_lookup_peer(zmq::message_t& msg); + /// Looks up a peers element given a connect index (for outgoing connections where we already + /// knew the pubkey and SN status) or an incoming zmq message (which has the pubkey and sn + /// status metadata set during initial connection authentication), creating a new peer element + /// if required. + decltype(peers)::iterator proxy_lookup_peer(int conn_index, zmq::message_t& msg); /// Handles built-in primitive commands in the proxy thread for things like "BYE" that have to /// be done in the proxy thread anyway (if we forwarded to a worker the worker would just have /// to send an instruction back to the proxy to do it). Returns true if one was handled, false /// to continue with sending to a worker. - bool proxy_handle_builtin(size_t conn_index, std::vector& parts); + bool proxy_handle_builtin(int conn_index, std::vector& parts); struct run_info; /// Gets an idle worker's run_info and removes the worker from the idle worker list. If there @@ -387,24 +428,31 @@ private: /// gets called after all works have done so. void proxy_quit(); + // Sets the various properties on an outgoing socket prior to connection. + void setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey = {}); + /// Common connection implementation used by proxy_connect/proxy_send. Returns the socket /// and, if a routing prefix is needed, the required prefix (or an empty string if not needed). /// For an optional connect that fail, returns nullptr for the socket. - std::pair proxy_connect(const std::string& pubkey, const std::string& connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive); + std::pair proxy_connect_sn(const std::string& pubkey, const std::string& connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive); - /// CONNECT command telling us to connect to a new pubkey. Returns the socket (which could be + /// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could be /// existing or a new one). - std::pair proxy_connect(bt_dict&& data); + std::pair proxy_connect_sn(bt_dict&& data); + + /// Opens a new connection to a remote, with callbacks. This is the proxy-side implementation + /// of the `connect_remote()` call. + void proxy_connect_remote(bt_dict_consumer data); /// Called to disconnect our remote connection to the given pubkey (if we have one). void proxy_disconnect(const std::string& pubkey); /// SEND command. Does a connect first, if necessary. - void proxy_send(bt_dict&& data); + void proxy_send(bt_dict_consumer data); /// REPLY command. Like SEND, but only has a listening socket route to send back to and so is /// weaker (i.e. it cannot reconnect to the SN if the connection is no longer open). - void proxy_reply(bt_dict&& data); + void proxy_reply(bt_dict_consumer data); /// Currently active batches. std::unordered_set batches; @@ -438,6 +486,9 @@ private: /// affects outgoing connections; incomings connections are the responsibility of the other end. void proxy_expire_idle_peers(); + /// Helper method to actually close a remote connection and update the stuff that needs updating. + void proxy_close_remote(int removed, bool linger = true); + /// Closes an outgoing connection immediately, updates internal variables appropriately. /// Returns the next iterator (the original may or may not be removed from peers, depending on /// whether or not it also has an active incoming connection). @@ -445,7 +496,7 @@ private: struct category { Access access; - std::unordered_map commands; + std::unordered_map> commands; unsigned int reserved_threads = 0; unsigned int active_threads = 0; int max_queue = 200; @@ -466,7 +517,7 @@ private: /// Retrieve category and callback from a command name, including alias mapping. Warns on /// invalid commands and returns nullptrs. The command name will be updated in place if it is /// aliased to another command. - std::pair get_command(std::string& command); + std::pair*> get_command(std::string& command); /// Checks a peer's authentication level. Returns true if allowed, warns and returns false if /// not. @@ -479,11 +530,12 @@ private: category& cat; std::string command; std::vector data_parts; - const CommandCallback* callback; + const std::pair* callback; std::string pubkey; + std::string id; bool service_node; - pending_command(category& cat, std::string command, std::vector data_parts, const CommandCallback* callback, std::string pubkey, bool service_node) + pending_command(category& cat, std::string command, std::vector data_parts, const std::pair* callback, std::string pubkey, bool service_node) : cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, callback{callback}, pubkey{std::move(pubkey)}, service_node{service_node} {} }; std::list pending_commands; @@ -510,8 +562,8 @@ private: int batch_jobno; // >= 0 for a job, -1 for the completion job union { - const CommandCallback* callback; // set if !is_batch_job - detail::Batch* batch; // set if is_batch_job + const std::pair* callback; // set if !is_batch_job + detail::Batch* batch; // set if is_batch_job }; // These belong to the proxy thread and must not be accessed by a worker: @@ -576,6 +628,17 @@ public: AllowFunc allow_connection, Logger logger = [](LogLevel, const char*, int, std::string) { }); + /** + * Simplified LokiMQ constructor for a client. This does not bind, generates ephemeral keys, + * and doesn't have peer_lookup capabilities, and treats all remotes as "basic", non-service + * node connections (for command authenication purposes). + */ + explicit LokiMQ(Logger logger = [](LogLevel, const char*, int, std::string) { }) + : LokiMQ("", "", false, {}, + [](const auto&) { return std::string{}; }, + [](string_view, string_view) { return Allow{AuthLevel::basic}; }, + std::move(logger)) {} + /** * Destructor; instructs the proxy to quit. The proxy tells all workers to quit, waits for them * to quit and rejoins the threads then quits itself. The outer thread (where the destructor is @@ -630,6 +693,15 @@ public: */ void add_command(const std::string& category, std::string name, CommandCallback callback); + /** + * Adds a new "request" command to an existing category. These commands are just like normal + * commands, but are expected to call `msg.send_reply()` with any data parts on every request, + * while normal commands are more general. + * + * Parameters given here are identical to `add_command()`. + */ + void add_request_command(const std::string& category, std::string name, CommandCallback callback); + /** * Adds a command alias; this is intended for temporary backwards compatibility: if any aliases * are defined then every command (not just aliased ones) has to be checked on invocation to see @@ -648,8 +720,12 @@ public: /** * Sets the number of worker threads reserved for batch jobs. If not called this defaults to - * half the number of hardware threads available (rounded up). This works exactly like reserved_threads - * for a category, but allows to batch jobs. See category for details. + * half the number of hardware threads available (rounded up). This works exactly like + * reserved_threads for a category, but allows to batch jobs. See category for details. + * + * Note that some internal jobs are counted as batch jobs: in particular timers added via + * add_timer() and replies received in response to request commands currently each take a batch + * job slot when invoked. * * Cannot be called after start()ing the LokiMQ instance. */ @@ -697,14 +773,54 @@ public: * guarantee that the hint will be used; it is only usefully specified if the * connection location has already been incidentally determined). */ - void connect(const std::string& pubkey, std::chrono::milliseconds keep_alive = 5min, const std::string& hint = ""); + void connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive = 5min, string_view hint = {}); /** - * Queue a message to be relayed to the SN identified with the given pubkey without expecting a - * reply. LokiMQ will attempt to relay the message (first connecting and handshaking if not - * already connected to the given SN). + * Establish a connection to the given remote with callbacks invoked on a successful or failed + * connection. The success callback gives you the pubkey of the remote, which can then be used + * to send commands to the remote (via `send()`). is generally intended for cases where the remote is + * being treated as the "server" and the local connection as a "client"; for connections between + * peers (i.e. between SNs) you generally want connect_sn() instead. If pubkey is non-empty + * then the remote must have that pubkey; if empty then any pubkey is allowed. * - * If a new connection it established it will have a relatively short (30s) idle timeout. If + * Unlike `connect_sn`, the connection established here will be kept open + * indefinitely (until you call disconnect). + * + * The `on_connect` and `on_failure` callbacks are invoked when a connection has been + * established or failed to establish. + * + * @param remote the remote connection address, such as `tcp://localhost:1234`. + * @param on_connect called with the identifier and the remote's pubkey after the connection has + * been established and handshaked. + * @param on_failure called with a failure message if we fail to connect. + * @param pubkey the required remote pubkey (empty to accept any). + * @param timeout how long to try before aborting the connection attempt and calling the + * on_failure callback. Note that the connection can fail for various reasons before the + * timeout. + */ + void connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure, + string_view pubkey = {}, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT); + + /** + * Disconnects an established outgoing connection established with `connect_remote()`. + * + * @param id the connection id, as returned by `connect_remote()`. + * + * @param linger how long to allow the connection to linger while there are still pending + * outbound messages to it before disconnecting and dropping any pending messages. (Note that + * this lingering is internal; the disconnect_remote() call does not block). The default is 1 + * second. + */ + void disconnect_remote(string_view id, std::chrono::milliseconds linger = 1s); + + /** + * Queue a message to be relayed to the node identified with the given identifier (for SNs and + * incoming connections this is a pubkey; for connections established with `connect()` this will + * be the opaque string returned by `connect()`), without expecting a reply. LokiMQ will + * attempt to relay the message (first connecting and handshaking if not already connected + * and the given pubkey is a service node's pubkey). + * + * If a new connection is established it will have a relatively short (30s) idle timeout. If * the connection should stay open longer you should call `connect(pubkey, IDLETIME)` first. * * Note that this method (along with connect) doesn't block waiting for a connection or for the @@ -712,7 +828,7 @@ public: * generally try hard to deliver it (reconnecting if the connection fails), but if the * connection fails persistently the message will eventually be dropped. * - * @param pubkey - the pubkey to send this to + * @param id - the pubkey or identifier returned by `connect()` to send this to * @param cmd - the first data frame value which is almost always the remote "category.command" name * @param opts - any number of std::string and send options. Each send option affects * how the send works; each string becomes a serialized message part. @@ -728,19 +844,20 @@ public: template void send(const std::string& pubkey, const std::string& cmd, const T&... opts); - /** - * Similar to the above, but takes an iterator pair of message parts to send after the value. + /** Send a command configured as a "REQUEST" command: the data parts will be prefixed with a + * random identifier. The remote is expected to reply with a ["REPLY", , ...] + * message, at which point we invoke the given callback with any [...] parts of the reply. * - * @param pubkey - the pubkey to send this to - * @param cmd - the value of the first message part (i.e. the remote command) - * @param first - an input iterator to std::string values - * @param last - the beyond-the-end iterator - * @param opts - any number of send options. This may also contain additional message strings - * which will be appended after the `[first, last)` message parts. + * @param pubkey - the pubkey to send this request to + * @param cmd - the command name + * @param callback - the callback to invoke when we get a reply. Called with a true value and + * the data strings when a reply is received, or false and an empty vector of data parts if we + * get no reply in the timeout interval. + * @param opts - anything else (i.e. strings, send_options) is forwarded to send(). */ - template - void send(const std::string& pubkey, const std::string& cmd, InputIt first, InputIt end, const T&... opts); - + template + void request(const std::string& pubkey, const std::string& cmd, ReplyCallback callback, + const T&... opts); /// The key pair this LokiMQ was created with; if empty keys were given during construction then /// this returns the generated keys. @@ -760,27 +877,38 @@ public: * submitting a single-job, no-completion batch. */ void job(std::function f); + + /** + * Adds a timer that gets scheduled periodically in the job queue. Normally jobs are not + * double-booked: that is, a new timed job will not be scheduled if the timer fires before a + * previously scheduled callback of the job has not yet completed. If you want to override this + * (so that, under heavy load or long jobs, there can be more than one of the same job scheduled + * or running at a time) then specify `squelch` as `false`. + */ + void add_timer(std::function job, std::chrono::milliseconds interval, bool squelch = true); }; /// Namespace for options to the send() method namespace send_option { -/// `serialized` lets you serialize once when sending the same data to many peers by constructing a -/// single serialized option and passing it repeatedly rather than needing to reserialize on each -/// send. -struct serialized { - std::string data; - template - serialized(const T& arg) : data{lokimq::bt_serialize(arg)} {} +template +struct data_parts_impl { + InputIt begin, end; + data_parts_impl(InputIt begin, InputIt end) : begin{std::move(begin)}, end{std::move(end)} {} }; +/// Specifies an iterator pair of data options to send, for when the number of arguments to send() +/// cannot be determined at compile time. +template +data_parts_impl data_parts(InputIt begin, InputIt end) { return {std::move(begin), std::move(end)}; } + /// Specifies a connection hint when passed in to send(). If there is no current connection to the /// peer then the hint is used to save a call to the SNRemoteAddress to get the connection location. /// (Note that there is no guarantee that the given hint will be used or that a SNRemoteAddress call /// will not also be done.) struct hint { std::string connect_hint; - hint(std::string connect_hint) : connect_hint{std::move(connect_hint)} {} + explicit hint(std::string connect_hint) : connect_hint{std::move(connect_hint)} {} }; /// Does a send() if we already have a connection (incoming or outgoing) with the given peer, @@ -806,75 +934,92 @@ namespace detail { // data (only sent if the data is non-empty). void send_control(zmq::socket_t& sock, string_view cmd, std::string data = {}); -/// Base case: takes a serializable value and appends it to the message parts -template -void apply_send_option(bt_list& parts, bt_dict&, const T& arg) { - parts.push_back(lokimq::bt_serialize(arg)); +/// Base case: takes a string-like value and appends it to the message parts +inline void apply_send_option(bt_list& parts, bt_dict&, string_view arg) { + parts.push_back(arg); } -/// `serialized` specialization: lets you serialize once when sending the same data to many peers -template <> inline void apply_send_option(bt_list& parts, bt_dict& , const send_option::serialized& serialized) { - parts.push_back(serialized.data); +/// `data_parts` specialization: appends a range of serialized data parts to the parts to send +template +void apply_send_option(bt_list& parts, bt_dict&, const send_option::data_parts_impl data) { + for (auto it = data.begin; it != data.end; ++it) + parts.push_back(lokimq::bt_deserialize(*it)); } /// `hint` specialization: sets the hint in the control data -template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::hint& hint) { +inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::hint& hint) { control_data["hint"] = hint.connect_hint; } /// `optional` specialization: sets the optional flag in the control data -template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::optional &) { +inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::optional &) { control_data["optional"] = 1; } /// `incoming` specialization: sets the optional flag in the control data -template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::incoming &) { +inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::incoming &) { control_data["incoming"] = 1; } /// `keep_alive` specialization: increases the outgoing socket idle timeout (if shorter) -template <> inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::keep_alive& timeout) { +inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::keep_alive& timeout) { control_data["keep-alive"] = timeout.time.count(); } -/// Calls apply_send_option on each argument and returns a bt_dict with the command plus data stored -/// in the "send" key plus whatever else is implied by any given option arguments. -template -bt_dict send_control_data(const std::string& cmd, InputIt begin, InputIt end, const T &...opts) { +} // namespace detail + +template +void LokiMQ::send(const std::string& pubkey, const std::string& cmd, const T &...opts) { bt_dict control_data; bt_list parts{{cmd}}; - parts.insert(parts.end(), std::move(begin), std::move(end)); #ifdef __cpp_fold_expressions (detail::apply_send_option(parts, control_data, opts),...); #else (void) std::initializer_list{(detail::apply_send_option(parts, control_data, opts), 0)...}; #endif - control_data["send"] = std::move(parts); - return control_data; -} - -} // namespace detail - -template -void LokiMQ::send(const std::string& pubkey, const std::string& cmd, InputIt first, InputIt last, const T &...opts) { - bt_dict control_data = detail::send_control_data(cmd, std::move(first), std::move(last), opts...); control_data["pubkey"] = pubkey; + control_data["send"] = std::move(parts); detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data)); } +std::string make_random_string(size_t size); + template -void LokiMQ::send(const std::string& pubkey, const std::string& cmd, const T &...opts) { - const std::string* no_it = nullptr; - send(pubkey, cmd, no_it, no_it, opts...); +void LokiMQ::request(const std::string& pubkey, const std::string& cmd, ReplyCallback callback, const T &...opts) { + auto reply_tag = make_random_string(15); // 15 should keep us in most stl implementations' small string optimization + bt_dict control_data; + bt_list parts{{cmd, reply_tag}}; +#ifdef __cpp_fold_expressions + (detail::apply_send_option(parts, control_data, opts),...); +#else + (void) std::initializer_list{(detail::apply_send_option(parts, control_data, opts), 0)...}; +#endif + + control_data["pubkey"] = pubkey; + control_data["send"] = std::move(parts); + control_data["request"] = true; + control_data["request_callback"] = reinterpret_cast(new ReplyCallback{std::move(callback)}); + control_data["request_tag"] = std::move(reply_tag); + detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data)); } template -void Message::reply(const std::string& command, Args&&... args) { +void Message::send_back(const std::string& command, Args&&... args) { + assert(reply_tag.empty()); if (service_node) lokimq.send(pubkey, command, std::forward(args)...); else lokimq.send(pubkey, command, send_option::optional{}, std::forward(args)...); } +template +void Message::send_reply(Args&&... args) { + assert(!reply_tag.empty()); + if (service_node) lokimq.send(pubkey, "REPLY", reply_tag, std::forward(args)...); + else lokimq.send(pubkey, "REPLY", reply_tag, send_option::optional{}, std::forward(args)...); +} + + + template void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) { @@ -890,16 +1035,7 @@ void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) { logger(lvl, file, line, os.str()); } -std::ostream &operator<<(std::ostream &os, LogLevel lvl) { - os << (lvl == LogLevel::trace ? "trace" : - lvl == LogLevel::debug ? "debug" : - lvl == LogLevel::info ? "info" : - lvl == LogLevel::warn ? "warn" : - lvl == LogLevel::error ? "ERROR" : - lvl == LogLevel::fatal ? "FATAL" : - "unknown"); - return os; -} +std::ostream &operator<<(std::ostream &os, LogLevel lvl); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c42dd70..2af3449 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -4,6 +4,8 @@ add_subdirectory(Catch2) set(LMQ_TEST_SRC main.cpp test_batch.cpp + test_commands.cpp + test_requests.cpp ) add_executable(tests ${LMQ_TEST_SRC}) diff --git a/tests/test_commands.cpp b/tests/test_commands.cpp new file mode 100644 index 0000000..24ff083 --- /dev/null +++ b/tests/test_commands.cpp @@ -0,0 +1,64 @@ +#include "lokimq/lokimq.h" +#include +#include + +using namespace lokimq; + +TEST_CASE("basic commands", "[cmd-basic]") { + std::string listen = "tcp://127.0.0.1:4567"; + LokiMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + {listen}, + [](auto &) { return ""; }, + [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }, + }; + + std::atomic hellos{0}, his{0}; + + server.add_category("public", Access{AuthLevel::none}); + server.add_command("public", "hello", [&](Message& m) { + // On every 1st, 3rd, 5th, ... hello send back a hi + if (hellos++ % 2 == 0) + m.send_back("public.hi"); + }); + server.start(); + + LokiMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + //client.log_level(LogLevel::trace); + + client.add_category("public", Access{AuthLevel::none}); + client.add_command("public", "hi", [&](auto&) { his++; }); + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + client.connect_remote(listen, + [&](std::string pk) { pubkey = std::move(pk); connected = true; }, + [&](string_view) { failed = true; }, + server.get_pubkey()); + + for (int i = 0; i < 20; i++) { + if (connected.load()) + break; + std::this_thread::sleep_for(100ms); + } + REQUIRE( connected.load() ); + REQUIRE( !failed.load() ); + REQUIRE( pubkey == server.get_pubkey() ); + + client.send(pubkey, "public.hello"); + std::this_thread::sleep_for(200ms); + REQUIRE( hellos == 1 ); + REQUIRE( his == 1 ); + + for (int i = 0; i < 50; i++) + client.send(pubkey, "public.hello"); + + std::this_thread::sleep_for(200ms); + REQUIRE( hellos == 51 ); + REQUIRE( his == 26 ); +} diff --git a/tests/test_requests.cpp b/tests/test_requests.cpp new file mode 100644 index 0000000..40a2f7d --- /dev/null +++ b/tests/test_requests.cpp @@ -0,0 +1,63 @@ +#include "lokimq/lokimq.h" +#include +#include + +using namespace lokimq; + +TEST_CASE("basic requests", "[req-basic]") { + std::string listen = "tcp://127.0.0.1:5678"; + LokiMQ server{ + "", "", // generate ephemeral keys + false, // not a service node + {listen}, + [](auto &) { return ""; }, + [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; }, + }; + + std::atomic hellos{0}, his{0}; + + server.add_category("public", Access{AuthLevel::none}); + server.add_request_command("public", "hello", [&](Message& m) { + m.send_reply("123"); + }); + server.start(); + + LokiMQ client( + [](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; } + ); + //client.log_level(LogLevel::trace); + + client.start(); + + std::atomic connected{false}, failed{false}; + std::string pubkey; + + client.connect_remote(listen, + [&](std::string pk) { pubkey = std::move(pk); connected = true; }, + [&](string_view) { failed = true; }, + server.get_pubkey()); + + for (int i = 0; i < 20; i++) { + if (connected.load()) + break; + std::this_thread::sleep_for(100ms); + } + REQUIRE( connected.load() ); + REQUIRE( !failed.load() ); + REQUIRE( pubkey == server.get_pubkey() ); + + std::atomic got_reply{false}; + bool success; + std::vector data; + client.request(pubkey, "public.hello", [&](bool ok, std::vector data_) { + got_reply = true; + success = ok; + data = std::move(data_); + }); + + // FIXME: we shouldn't need to wait this long (perhaps explore zmq send immediate?) + std::this_thread::sleep_for(1500ms); + REQUIRE( got_reply.load() ); + REQUIRE( success ); + REQUIRE( data == std::vector{{"123"}} ); +}