#pragma once #include #include "oxenmq.h" // Inside some method: // OMQ_LOG(warn, "bad ", 42, " stuff"); // #define OMQ_LOG(level, ...) log(LogLevel::level, __FILE__, __LINE__, __VA_ARGS__) #ifndef NDEBUG // Same as OMQ_LOG(trace, ...) when not doing a release build; nothing under a release build. # define OMQ_TRACE(...) log(LogLevel::trace, __FILE__, __LINE__, __VA_ARGS__) #else # define OMQ_TRACE(...) #endif namespace oxenmq { constexpr char SN_ADDR_COMMAND[] = "inproc://sn-command"; constexpr char SN_ADDR_WORKERS[] = "inproc://sn-workers"; constexpr char SN_ADDR_SELF[] = "inproc://sn-self"; constexpr char ZMQ_ADDR_ZAP[] = "inproc://zeromq.zap.01"; #ifdef OXENMQ_USE_EPOLL constexpr auto EPOLL_COMMAND_ID = std::numeric_limits::max(); constexpr auto EPOLL_WORKER_ID = std::numeric_limits::max() - 1; constexpr auto EPOLL_ZAP_ID = std::numeric_limits::max() - 2; #endif /// Destructor for create_message(std::string&&) that zmq calls when it's done with the message. extern "C" inline void message_buffer_destroy(void*, void* hint) { delete reinterpret_cast(hint); } /// Creates a message without needing to reallocate the provided string data inline zmq::message_t create_message(std::string&& data) { auto *buffer = new std::string(std::move(data)); return zmq::message_t{&(*buffer)[0], buffer->size(), message_buffer_destroy, buffer}; }; /// Create a message copying from a string_view inline zmq::message_t create_message(std::string_view data) { return zmq::message_t{data.begin(), data.end()}; } template bool send_message_parts(zmq::socket_t& sock, It begin, It end) { while (begin != end) { zmq::message_t &msg = *begin++; if (!sock.send(msg, begin == end ? zmq::send_flags::dontwait : zmq::send_flags::dontwait | zmq::send_flags::sndmore)) return false; } return true; } template bool send_message_parts(zmq::socket_t& sock, Container&& c) { return send_message_parts(sock, c.begin(), c.end()); } /// Sends a message with an initial route. `msg` and `data` can be empty: if `msg` is empty then /// the msg frame will be an empty message; if `data` is empty then the data frame will be omitted. /// `flags` is passed through to zmq: typically given `zmq::send_flags::dontwait` to throw rather /// than block if a message can't be queued. inline bool send_routed_message(zmq::socket_t& socket, std::string route, std::string msg = {}, std::string data = {}) { assert(!route.empty()); std::array msgs{{create_message(std::move(route))}}; if (!msg.empty()) msgs[1] = create_message(std::move(msg)); if (!data.empty()) msgs[2] = create_message(std::move(data)); return send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end()); } // Sends some stuff to a socket directly. If dontwait is true then we throw instead of blocking if // the message cannot be accepted by zmq (i.e. because the outgoing buffer is full). inline bool send_direct_message(zmq::socket_t& socket, std::string msg, std::string data = {}) { std::array msgs{{create_message(std::move(msg))}}; if (!data.empty()) msgs[1] = create_message(std::move(data)); return send_message_parts(socket, msgs.begin(), data.empty() ? std::prev(msgs.end()) : msgs.end()); } // Receive all the parts of a single message from the given socket. Returns true if a message was // received, false if called with flags=zmq::recv_flags::dontwait and no message was available. inline bool recv_message_parts(zmq::socket_t& sock, std::vector& parts, const zmq::recv_flags flags = zmq::recv_flags::none) { do { zmq::message_t msg; if (!sock.recv(msg, flags)) return false; parts.push_back(std::move(msg)); } while (parts.back().more()); return true; } // Same as above, but using a fixed sized array; this is only used for internal jobs (e.g. control // messages) where we know the message parts should never exceed a given size (this function does // not bounds check except in debug builds). Returns the number of message parts received, or 0 on // read error. template inline size_t recv_message_parts(zmq::socket_t& sock, std::array& parts, const zmq::recv_flags flags = zmq::recv_flags::none) { for (size_t count = 0; ; count++) { assert(count < N); if (!sock.recv(parts[count], flags)) return 0; if (!parts[count].more()) return count + 1; } } inline const char* peer_address(zmq::message_t& msg) { try { return msg.gets("Peer-Address"); } catch (...) {} return "(unknown)"; } // Returns a string view of the given message data. It's the caller's responsibility to keep the // referenced message alive. If you want a std::string instead just call `m.to_string()` inline std::string_view view(const zmq::message_t& m) { return {m.data(), m.size()}; } // Extracts and builds the "send" part of a message for proxy_send/proxy_reply inline std::list build_send_parts(oxenc::bt_list_consumer send, std::string_view route) { std::list parts; if (!route.empty()) parts.push_back(create_message(route)); while (!send.is_finished()) parts.push_back(create_message(send.consume_string())); return parts; } /// Sends a control message to a specific destination by prefixing the worker name (or identity) /// then appending the command and optional data (if non-empty). (This is needed when sending the control message /// to a router socket, i.e. inside the proxy thread). inline void route_control(zmq::socket_t& sock, std::string_view identity, std::string_view cmd, const std::string& data = {}) { sock.send(create_message(identity), zmq::send_flags::sndmore); detail::send_control(sock, cmd, data); } }