Added support for general (non-SN) communications

The existing code was largely set up for SN-to-SN or client-to-SN
communications, where messages can always get to the right place because
we can always send by pubkey.

This doesn't work when we want general communications with a random
remote address.

This commit overhauls the way loki-mq handles communication in a few
important ways:

- Listening instances no longer pass bind addresses into the
constructor; instead they call `listen_curve()` or `listen_plain()`
before invoking `start()`.

- `listen_curve()` is equivalent to the existing bind support: it
listens on a socket and accepts encrypted handshaked connections from
anyone who already knows the server's public key.

- `listen_plain()` is all new: it sets up a plain text listening socket
over which random clients can connect and talk.  End-points aren't
verified, and it isn't encrypted, but if you don't know who you are
talking to then encryption isn't doing anything anyway.

- Connecting to a remote now connections in CURVE encryption or NULL
(plain-text) encryption based on whether you provide a remote_pubkey.
For CURVE, the connection will fail if the pubkey does not match.

- `ConnectionID` objects are now returned when connecting to a remote
address; this object is then passed in to send/request/etc. to direct
the message.  For SN communication, ConnectionID's can be created
implicitly from SN pubkey strings, so the existing interface of
`lmq.send(pubkey, ...)` will still work in most cases.

- A ConnectionID is now passed to the ConnectSuccess and ConnectFailure
callbacks.  This can be used to uniquely identify which connection
succeeded or failed, and can determine whether the remote is a service
node (`.sn()`) and/or the pubkey (`.pubkey()`).  (Obviously the service
node status is only available when the client can do service node
lookups, and the pubkey() is only non-empty for encrypted connections).
This commit is contained in:
Jason Rhinelander 2020-02-28 00:16:43 -04:00
parent e4d371b026
commit 57f0ca74da
6 changed files with 1028 additions and 581 deletions

File diff suppressed because it is too large Load Diff

View File

@ -76,9 +76,8 @@ struct Access {
bool local_sn = false;
};
/// Return type of the AllowFunc: this determines whether we allow the connection at all, and if
/// so, sets the initial authentication level and tells LokiMQ whether the other hand is an
/// active SN.
/// Return type of the AllowFunc: this determines whether we allow the connection at all, and if so,
/// sets the initial authentication level and tells LokiMQ whether the other end is an active SN.
struct Allow {
AuthLevel auth = AuthLevel::none;
bool remote_sn = false;
@ -86,6 +85,69 @@ struct Allow {
class LokiMQ;
/// Opaque data structure representing a connection which supports ==, !=, < and std::hash. For
/// connections to service node this is the service node pubkey (and you can pass a 32-byte string
/// anywhere a ConnectionID is called for). For non-SN remote connections you need to keep a copy
/// of the ConnectionID returned by connect_remote().
struct ConnectionID {
ConnectionID(std::string pubkey_) : id{SN_ID}, pk{std::move(pubkey_)} {
if (pk.size() != 32)
throw std::runtime_error{"Invalid pubkey: expected 32 bytes"};
}
ConnectionID(string_view pubkey_) : ConnectionID(std::string{pubkey_}) {}
ConnectionID(const ConnectionID&) = default;
ConnectionID(ConnectionID&&) = default;
ConnectionID& operator=(const ConnectionID&) = default;
ConnectionID& operator=(ConnectionID&&) = default;
// Two ConnectionIDs are equal if they are both SNs and have matching pubkeys, or they are both
// not SNs and have matching internal IDs. (Pubkeys do not have to match for non-SNs).
bool operator==(const ConnectionID &o) const {
if (id == SN_ID && o.id == SN_ID)
return pk == o.pk;
return id == o.id;
}
bool operator!=(const ConnectionID &o) const { return !(*this == o); }
bool operator<(const ConnectionID &o) const {
if (id == SN_ID && o.id == SN_ID)
return pk < o.pk;
return id < o.id;
}
// Returns true if this ConnectionID represents a SN connection
bool sn() const { return id == SN_ID; }
// Returns this connection's pubkey, if any. (Note that it is possible to have a pubkey and not
// be a SN when connecting to secure remotes: having a non-empty pubkey does not imply that
// `sn()` is true).
const std::string& pubkey() const { return pk; }
// Default construction; creates a ConnectionID with an invalid internal ID that will not match
// an actual connection.
ConnectionID() : ConnectionID(0) {}
private:
ConnectionID(long long id) : id{id} {}
ConnectionID(long long id, std::string pubkey) : id{id}, pk{std::move(pubkey)} {}
constexpr static long long SN_ID = -1;
long long id = 0;
std::string pk;
friend class LokiMQ;
friend class std::hash<ConnectionID>;
friend std::ostream& operator<<(std::ostream& o, const ConnectionID& conn);
};
} // namespace lokimq
namespace std {
// Need this here because we stick it in an unordered_map below.
template <> struct hash<lokimq::ConnectionID> {
size_t operator()(const lokimq::ConnectionID &c) const {
return c.sn() ? std::hash<std::string>{}(c.pk) :
std::hash<long long>{}(c.id);
}
};
} // namespace std
namespace lokimq {
/// Encapsulates an incoming message from a remote connection with message details plus extra
/// info need to send a reply back through the proxy thread via the `reply()` method. Note that
/// this object gets reused: callbacks should use but not store any reference beyond the callback.
@ -93,13 +155,12 @@ class Message {
public:
LokiMQ& lokimq; ///< The owning LokiMQ object
std::vector<string_view> data; ///< The provided command data parts, if any.
string_view id; ///< The remote's unique, opaque id for routing.
string_view pubkey; ///< The remote's pubkey (32 bytes)
bool service_node; ///< True if the pubkey is an active SN (note that this is only checked on initial connection, not every received message)
ConnectionID conn; ///< The connection info for routing a reply; also contains the pubkey/sn status.
std::string route; ///< The return route for a reply (if the message was on an incoming conn)
std::string reply_tag; ///< If the invoked command is a request command this is the required reply tag that will be prepended by `send_reply()`.
/// Constructor
Message(LokiMQ& lmq) : lokimq{lmq} {}
Message(LokiMQ& lmq, ConnectionID cid) : lokimq{lmq}, conn{std::move(cid)} {}
// Non-copyable
Message(const Message&) = delete;
@ -119,9 +180,18 @@ public:
/// Sends a reply to a request. This takes no command: the command is always the built-in
/// "REPLY" command, followed by the unique reply tag, then any reply data parts. All other
/// arguments are as in `send_back()`.
/// arguments are as in `send_back()`. You should only send one reply for a command expecting
/// replies, though this is not enforced: attempting to send multiple replies will simply be
/// dropped when received by the remote. (Note, however, that it is possible to send multiple
/// messages -- e.g. you could send a reply and then also call send_back() and/or send_request()
/// to send more requests back to the sender).
template <typename... Args>
void send_reply(Args&&... args);
/// Sends a request back to whomever sent this message. This is effectively a wrapper around
/// lmq.request() that takes care of setting up the recipient arguments.
template <typename ReplyCallback, typename... Args>
void send_request(string_view cmd, ReplyCallback&& callback, Args&&... args);
};
// Forward declarations; see batch.h
@ -150,6 +220,7 @@ static constexpr size_t MAX_CATEGORY_LENGTH = 50;
/// Maximum length of a command
static constexpr size_t MAX_COMMAND_LENGTH = 200;
/**
* Class that handles LokiMQ listeners, connections, proxying, and workers. An application
* typically has just one instance of this class.
@ -172,10 +243,6 @@ private:
/// True if *this* node is running in service node mode (whether or not actually active)
bool local_service_node = false;
/// Addresses on which to listen, or empty if we only establish outgoing connections but aren't
/// listening.
std::vector<std::string> bind;
/// The thread in which most of the intermediate work happens (handling external connections
/// and proxying requests between them to worker threads)
std::thread proxy_thread;
@ -200,16 +267,18 @@ public:
/// connect to us and to set its initial authentication level.
///
/// @param ip - the ip address of the incoming connection
/// @param pubkey - the x25519 pubkey of the connecting client (32 byte string)
/// @param pubkey - the x25519 pubkey of the connecting client (32 byte string). Note that this
/// will only be non-empty for incoming connections on `listen_curve` sockets; `listen_plain`
/// sockets do not have a pubkey.
///
/// @returns an `AuthLevel` enum value indicating the default auth level for the incoming
/// connection, or AuthLevel::denied if the connection should be refused.
using AllowFunc = std::function<Allow(const std::string &ip, const std::string &pubkey)>;
using AllowFunc = std::function<Allow(string_view ip, string_view pubkey)>;
/// Callback that is invoked when we need to send a "strong" message to a SN that we aren't
/// already connected to and need to establish a connection. This callback returns the ZMQ
/// connection string we should use which is typically a string such as `tcp://1.2.3.4:5678`.
using SNRemoteAddress = std::function<std::string(const std::string &pubkey)>;
using SNRemoteAddress = std::function<std::string(string_view pubkey)>;
/// The callback type for registered commands.
using CommandCallback = std::function<void(Message& message)>;
@ -225,9 +294,9 @@ public:
using Logger = std::function<void(LogLevel level, const char* file, int line, std::string msg)>;
/// Callback for the success case of connect_remote()
using ConnectSuccess = std::function<void(const std::string& pubkey)>;
using ConnectSuccess = std::function<void(ConnectionID)>;
/// Callback for the failure case of connect_remote()
using ConnectFailure = std::function<void(const std::string& reason)>;
using ConnectFailure = std::function<void(ConnectionID, string_view)>;
/// Explicitly non-copyable, non-movable because most things here aren't copyable, and a few
/// things aren't movable, either. If you need to pass the LokiMQ instance around, wrap it
@ -253,10 +322,7 @@ public:
private:
/// The lookup function that tells us where to connect to a peer, or empty if not found.
SNRemoteAddress peer_lookup;
/// Callback to see whether the incoming connection is allowed
AllowFunc allow_connection;
SNRemoteAddress sn_lookup;
/// The log level; this is atomic but we use relaxed order to set and access it (so changing it
/// might not be instantly visible on all threads, but that's okay).
@ -272,56 +338,80 @@ private:
///////////////////////////////////////////////////////////////////////////////////
/// NB: The following are all the domain of the proxy thread (once it is started)!
/// Addresses to bind to in `start()`
std::vector<std::string> bind_addresses;
/// The socket we listen on for handling ZAP authentication requests (the other end is internal
/// to zmq which sends requests to us as needed).
zmq::socket_t zap_auth{context, zmq::socket_type::rep};
/// Our listening ROUTER socket for incoming connections (will be left unconnected if not
/// listening).
zmq::socket_t listener;
struct bind_data {
bool curve;
size_t index;
AllowFunc allow;
bind_data(bool curve, AllowFunc allow)
: curve{curve}, index{0}, allow{std::move(allow)} {}
};
/// Info about a peer's established connection to us. Note that "established" means both
/// Addresses on which we are listening (or, before start(), on which we will listen).
std::vector<std::pair<std::string, bind_data>> bind;
/// Info about a peer's established connection with us. Note that "established" means both
/// connected and authenticated.
struct peer_info {
/// Pubkey of the remote; can be empty (especially before handshake) but will only be set if
/// the pubkey has been verified.
/// Pubkey of the remote, if this connection is a curve25519 connection; empty otherwise.
std::string pubkey;
/// True if we've authenticated this peer as a service node.
/// True if we've authenticated this peer as a service node. This gets set on incoming
/// messages when we check the remote's pubkey, and immediately on outgoing connections to
/// SNs (since we know their pubkey -- we'll fail to connect if it doesn't match).
bool service_node = false;
/// The auth level of this peer
/// The auth level of this peer, as returned by the AllowFunc for incoming connections or
/// specified during outgoing connections.
AuthLevel auth_level = AuthLevel::none;
/// Will be set to a non-empty routing prefix if if we have (or at least recently had) an
/// established incoming connection with this peer. Will be empty if there is no incoming
/// connection.
std::string incoming;
/// The actual internal socket index through which this connection is established
size_t conn_index;
/// The index in `remotes` if we have an established outgoing connection to this peer, -1 if
/// we have no outgoing connection to this peer.
int outgoing = -1;
/// Will be set to a non-empty routing prefix *if* one is necessary on the connection. This
/// is used only for SN peers (non-SN incoming connections don't have a peer_info record,
/// and outgoing connections don't have a route).
std::string route;
/// The last time we sent or received a message (or had some other relevant activity) with
/// this peer. Used for closing outgoing connections that have reached an inactivity expiry
/// time.
/// Returns true if this is an outgoing connection. (This is simply an alias for
/// route.empty() -- outgoing connections never have a route, incoming connections always
/// do).
bool outgoing() const { return route.empty(); }
/// The last time we sent or received a message (or had some other relevant activity) on
/// this connection. Used for closing outgoing connections that have reached an inactivity
/// expiry time (closing inactive conns for incoming connections is done by the other end).
std::chrono::steady_clock::time_point last_activity;
/// Updates last_activity to the current time
void activity() { last_activity = std::chrono::steady_clock::now(); }
/// After more than this much inactivity we will close an idle connection
/// After more than this much inactivity we will close an idle (outgoing) connection
std::chrono::milliseconds idle_expiry;
};
/// Currently peer connections: id -> peer_info. id == pubkey for incoming and outgoing SN
/// connections; random string for outgoing direct connections.
std::unordered_map<std::string, peer_info> peers;
/// Currently peer connections: id -> peer_info. The ID is as returned by connect_remote or a
/// SN pubkey string.
std::unordered_multimap<ConnectionID, peer_info> peers;
/// Maps connection indices (which can change) to ConnectionIDs (which are permanent).
std::vector<ConnectionID> conn_index_to_id;
/// Maps listening socket ConnectionIDs to connection index values (these don't have peers
/// entries)
std::unordered_map<ConnectionID, size_t> incoming_conn_index;
/// The next ConnectionID value we should use (for non-SN connections).
std::atomic<long long> next_conn_id{1};
/// Remotes we are still trying to connect to (via connect_remote(), not connect_sn()); when
/// we pass handshaking we move them out of here and (if set) trigger the on_connect callback.
/// Unlike regular node-to-node peers, these have an extra "HI"/"HELLO" sequence that we used
/// before we consider ourselves connected to the remote.
std::vector<std::tuple<int /*remotes index*/, std::chrono::steady_clock::time_point, ConnectSuccess, ConnectFailure>> pending_connects;
std::vector<std::tuple<size_t /*conn_index*/, long long /*conn_id*/, std::chrono::steady_clock::time_point, ConnectSuccess, ConnectFailure>> pending_connects;
/// Pending requests that have been sent out but not yet received a matching "REPLY". The value
/// is the timeout timestamp.
@ -329,26 +419,22 @@ private:
pending_requests;
/// different polling sockets the proxy handler polls: this always contains some internal
/// sockets for inter-thread communication followed by listener socket and a pollitem for every
/// (outgoing) remote socket in `remotes`. This must be in a sequential vector because of zmq
/// requirements (otherwise it would be far nicer to not have to synchronize the two vectors).
/// sockets for inter-thread communication followed by a pollitem for every connection (both
/// incoming and outgoing) in `connections`. We rebuild this from `connections` whenever
/// `pollitems_stale` is set to true.
std::vector<zmq::pollitem_t> pollitems;
/// Properly adds a socket to poll for input to pollitems
void add_pollitem(zmq::socket_t& sock);
/// If set then rebuild pollitems before the next poll (set when establishing new connections or
/// closing existing ones).
bool pollitems_stale = true;
/// The number of internal sockets in `pollitems`
static constexpr size_t poll_internal_size = 3;
/// Rebuilds pollitems to include the internal sockets + all incoming/outgoing sockets.
void rebuild_pollitems();
/// The pollitems location corresponding to `remotes[0]`.
const size_t poll_remote_offset; // will be poll_internal_size + 1 for a full listener (the +1 is the listening socket); poll_internal_size for a remote-only
/// The outgoing remote connections we currently have open along with the remote pubkeys. Each
/// The connections to/from remotes we currently have open, both listening and outgoing. Each
/// element [i] here corresponds to an the pollitem_t at pollitems[i+1+poll_internal_size].
/// (Ideally we'd use one structure, but zmq requires the pollitems be in contiguous storage).
/// For new connections established via connect_remote the pubkey will be empty until we
/// do the HI/HELLO handshake over the socket.
std::vector<std::pair<std::string, zmq::socket_t>> remotes;
std::vector<zmq::socket_t> connections;
/// Socket we listen on to receive control messages in the proxy thread. Each thread has its own
/// internal "control" connection (returned by `get_control_socket()`) to this socket used to
@ -412,7 +498,7 @@ private:
/// be done in the proxy thread anyway (if we forwarded to a worker the worker would just have
/// to send an instruction back to the proxy to do it). Returns true if one was handled, false
/// to continue with sending to a worker.
bool proxy_handle_builtin(int conn_index, std::vector<zmq::message_t>& parts);
bool proxy_handle_builtin(size_t conn_index, std::vector<zmq::message_t>& parts);
struct run_info;
/// Gets an idle worker's run_info and removes the worker from the idle worker list. If there
@ -434,24 +520,45 @@ private:
/// gets called after all works have done so.
void proxy_quit();
// Sets the various properties on an outgoing socket prior to connection.
// Sets the various properties for a listening socket prior to binding. If curve is true then
// the socket is set up using the keys and incoming connections must already know the pubkey to
// establish a connection; otherwise the connection is plaintext without authentication.
void setup_listening_socket(zmq::socket_t& socket, bool curve);
// Sets the various properties on an outgoing socket prior to connection. If remote_pubkey is
// provided then the connection will be curve25519 encrypted and authenticate; otherwise it will
// be unencrypted and unauthenticated. Note that the remote end must be in the same mode (i.e.
// either accepting curve connections, or not accepting curve).
void setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pubkey = {});
/// Common connection implementation used by proxy_connect/proxy_send. Returns the socket
/// and, if a routing prefix is needed, the required prefix (or an empty string if not needed).
/// For an optional connect that fail, returns nullptr for the socket.
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(string_view pubkey, string_view connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive);
///
/// @param pubkey the pubkey to connect to
/// @param connect_hint if we need a new connection and this is non-empty then we *may* use it
/// instead of doing a call to `sn_lookup()`.
/// @param optional if we don't already have a connection then don't establish a new one
/// @param incoming_only only relay this if we have an established incoming connection from the
/// given SN, otherwise don't connect (like `optional`)
/// @param keep_alive the keep alive for the connection, if we establish a new outgoing
/// connection. If we already have an outgoing connection then its keep-alive gets increased to
/// this if currently less than this.
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(string_view pubkey, string_view connect_hint,
bool optional, bool incoming_only, std::chrono::milliseconds keep_alive);
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could be
/// existing or a new one).
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(bt_dict&& data);
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could
/// be existing or a new one). This basically just unpacks arguments and passes them on to
/// proxy_connect_sn().
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(bt_dict_consumer data);
/// Opens a new connection to a remote, with callbacks. This is the proxy-side implementation
/// of the `connect_remote()` call.
void proxy_connect_remote(bt_dict_consumer data);
/// Called to disconnect our remote connection to the given pubkey (if we have one).
void proxy_disconnect(const std::string& pubkey);
/// Called to disconnect our remote connection to the given id (if we have one).
void proxy_disconnect(bt_dict_consumer data);
void proxy_disconnect(ConnectionID conn, std::chrono::milliseconds linger);
/// SEND command. Does a connect first, if necessary.
void proxy_send(bt_dict_consumer data);
@ -480,10 +587,9 @@ private:
/// Same, but deserialized
void proxy_timer(std::function<void()> job, std::chrono::milliseconds interval, bool squelch);
/// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this is called with the
/// zap auth socket to do non-blocking processing of any waiting authentication requests waiting
/// on it to verify whether the connection is from a valid/allowed SN.
void process_zap_requests(zmq::socket_t& zap_auth);
/// ZAP (https://rfc.zeromq.org/spec:27/ZAP/) authentication handler; this does non-blocking
/// processing of any waiting authentication requests for new incoming connections.
void process_zap_requests();
/// Handles a control message from some outer thread to the proxy
void proxy_control_message(std::vector<zmq::message_t>& parts);
@ -493,7 +599,7 @@ private:
void proxy_expire_idle_peers();
/// Helper method to actually close a remote connection and update the stuff that needs updating.
void proxy_close_remote(int removed, bool linger = true);
void proxy_close_connection(size_t removed, std::chrono::milliseconds linger);
/// Closes an outgoing connection immediately, updates internal variables appropriately.
/// Returns the next iterator (the original may or may not be removed from peers, depending on
@ -527,7 +633,7 @@ private:
/// Checks a peer's authentication level. Returns true if allowed, warns and returns false if
/// not.
bool proxy_check_auth(string_view pubkey, size_t conn_index, const peer_info& peer,
bool proxy_check_auth(size_t conn_index, bool outgoing, const peer_info& peer,
const std::string& command, const category& cat, zmq::message_t& msg);
/// Details for a pending command; such a command already has authenticated access and is just
@ -537,12 +643,13 @@ private:
std::string command;
std::vector<zmq::message_t> data_parts;
const std::pair<CommandCallback, bool>* callback;
std::string pubkey;
std::string id;
bool service_node;
ConnectionID conn;
std::string conn_route;
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts, const std::pair<CommandCallback, bool>* callback, std::string pubkey, bool service_node)
: cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)}, callback{callback}, pubkey{std::move(pubkey)}, service_node{service_node} {}
pending_command(category& cat, std::string command, std::vector<zmq::message_t> data_parts,
const std::pair<CommandCallback, bool>* callback, ConnectionID conn, std::string conn_route)
: cat{cat}, command{std::move(command)}, data_parts{std::move(data_parts)},
callback{callback}, conn{std::move(conn)}, conn_route{std::move(conn_route)} {}
};
std::list<pending_command> pending_commands;
@ -560,8 +667,8 @@ private:
// these shouldn't be accessed and likely contain stale data).
category *cat;
std::string command;
std::string pubkey;
bool service_node = false;
ConnectionID conn; // The connection (or SN pubkey) to reply on/to.
std::string conn_route; // if non-empty this is the reply routing prefix (for incoming connections)
std::vector<zmq::message_t> data_parts;
// If is_batch_job true then these are set (if is_batch_job false then don't access these!):
@ -573,9 +680,9 @@ private:
};
// These belong to the proxy thread and must not be accessed by a worker:
std::thread thread;
std::thread worker_thread;
size_t worker_id; // The index in `workers`
std::string routing_id; // "w123" where 123 == worker_id
std::string worker_routing_id; // "w123" where 123 == worker_id
/// Loads the run info with a pending command
run_info& operator=(pending_command&& pending);
@ -606,15 +713,13 @@ public:
* *capable* of being a service node, whether or not we are currently actively). If specified
* as true then the pubkey and privkey values must not be empty.
*
* @param bind list of addresses to bind to. Can be any string zmq supports; typically a tcp
* IP/port combination such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678". Can be empty to not
* listen at all.
*
* @param peer_lookup function that takes a pubkey key (32-byte binary string) and returns a
* @param sn_lookup function that takes a pubkey key (32-byte binary string) and returns a
* connection string such as "tcp://1.2.3.4:23456" to which a connection should be established
* to reach that service node. Note that this function is only called if there is no existing
* connection to that service node, and that the function is never called for a connection to
* self (that uses an internal connection instead). Should return empty for not found.
* self (that uses an internal connection instead). Also note that the service node must be
* listening in curve25519 mode (otherwise we couldn't verify its authenticity). Should return
* empty for not found or if SN lookups are not supported.
*
* @param allow_incoming is a callback that LokiMQ can use to determine whether an incoming
* connection should be allowed at all and, if so, whether the connection is from a known
@ -629,21 +734,16 @@ public:
LokiMQ( std::string pubkey,
std::string privkey,
bool service_node,
std::vector<std::string> bind,
SNRemoteAddress peer_lookup,
AllowFunc allow_connection,
SNRemoteAddress sn_lookup,
Logger logger = [](LogLevel, const char*, int, std::string) { });
/**
* Simplified LokiMQ constructor for a client. This does not bind, generates ephemeral keys,
* and doesn't have peer_lookup capabilities, and treats all remotes as "basic", non-service
* node connections (for command authenication purposes).
* Simplified LokiMQ constructor for a simple listener without any SN connection/authentication
* capabilities. This treats all remotes as "basic", non-service node connections for command
* authentication purposes.
*/
explicit LokiMQ(Logger logger = [](LogLevel, const char*, int, std::string) { })
: LokiMQ("", "", false, {},
[](const auto&) { return std::string{}; },
[](string_view, string_view) { return Allow{AuthLevel::basic}; },
std::move(logger)) {}
: LokiMQ("", "", false, [](auto) { return ""s; /*no peer lookups*/ }, std::move(logger)) {}
/**
* Destructor; instructs the proxy to quit. The proxy tells all workers to quit, waits for them
@ -757,6 +857,33 @@ public:
*/
void start();
/** Start listening on the given bind address using curve authentication/encryption. Incoming
* connections will only be allowed from clients that already have the server's pubkey, and
* will be encrypted. `allow_connection` is invoked for any incoming connections on this
* address to determine the incoming remote's access and authentication level.
*
* @param bind address - can be any string zmq supports; typically a tcp IP/port combination
* such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678".
*
* @param allow_connection function to call to determine whether to allow the connection and, if
* so, the authentication level it receives. If omitted the default returns non-service node,
* AuthLevel::none access.
*/
void listen_curve(std::string bind, AllowFunc allow_connection = [](auto, auto) { return Allow{AuthLevel::none, false}; });
/** Start listening on the given bind address in unauthenticated plain text mode. Incoming
* connections can come from anywhere. `allow_connection` is invoked for any incoming
* connections on this address to determine the incoming remote's access and authentication
* level. Note that `allow_connection` here will be called with an empty pubkey.
*
* @param bind address - can be any string zmq supports; typically a tcp IP/port combination
* such as: "tcp://\*:4567" or "tcp://1.2.3.4:5678".
*
* @param allow_connection function to call to determine whether to allow the connection and, if
* so, the authentication level it receives.
*/
void listen_plain(std::string bind, AllowFunc allow_connection);
/**
* Try to initiate a connection to the given SN in anticipation of needing a connection in the
* future. If a connection is already established, the connection's idle timer will be reset
@ -772,89 +899,120 @@ public:
* @param keep_alive - the connection will be kept alive if there was valid activity within
* the past `keep_alive` milliseconds. If an outgoing connection already
* exists, the longer of the existing and the given keep alive is used.
* Note that the default applied here is much longer than the default for an
* implicit connect() by calling send() directly.
* (Note that the default applied here is much longer than the default for an
* implicit connect() by calling send() directly.)
* @param hint - if non-empty and a new outgoing connection needs to be made this hint value
* may be used instead of calling the lookup function. (Note that there is no
* guarantee that the hint will be used; it is only usefully specified if the
* connection location has already been incidentally determined).
* connection address has already been incidentally determined).
*
* @returns a ConnectionID that identifies an connection with the given SN. Typically you
* *don't* need to worry about this (and can just discard it): you can always simply pass the
* pubkey as a string wherever a ConnectionID is called.
*/
void connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive = 5min, string_view hint = {});
ConnectionID connect_sn(string_view pubkey, std::chrono::milliseconds keep_alive = 5min, string_view hint = {});
/**
* Establish a connection to the given remote with callbacks invoked on a successful or failed
* connection. The success callback gives you the pubkey of the remote, which can then be used
* to send commands to the remote (via `send()`). is generally intended for cases where the remote is
* being treated as the "server" and the local connection as a "client"; for connections between
* peers (i.e. between SNs) you generally want connect_sn() instead. If pubkey is non-empty
* then the remote must have that pubkey; if empty then any pubkey is allowed.
* connection. Returns a ConnectionID associated with the connection being attempted. It is
* possible to send to the remote before the successful callback is invoked, but there is no
* guarantee that the messages will be delivered (e.g. if the connection ultimately fails).
*
* Unlike `connect_sn`, the connection established here will be kept open
* indefinitely (until you call disconnect).
* For connections to a service node you generally want connect_sn() instead (which verifies
* that it is talking to the SN and encrypts the connection).
*
* Unlike `connect_sn`, the connection established here will be kept open indefinitely (until
* you call disconnect).
*
* The `on_connect` and `on_failure` callbacks are invoked when a connection has been
* established or failed to establish.
*
* @param remote the remote connection address, such as `tcp://localhost:1234`.
* @param on_connect called with the identifier and the remote's pubkey after the connection has
* been established and handshaked.
* @param on_failure called with a failure message if we fail to connect.
* @param pubkey the required remote pubkey (empty to accept any).
* @param on_connect called with the identifier after the connection has been established.
* @param on_failure called with the identifier and failure message if we fail to connect.
* @param pubkey if non-empty then connect securely (using curve encryption) and verify that the
* remote's pubkey equals the given value. Specifying this is similar to using connect_sn()
* except that we do not treat the remote as a SN for command authorization purposes.
* @param auth_level determines the authentication level of the remote for issuing commands to
* us. The default is `AuthLevel::none`.
* @param timeout how long to try before aborting the connection attempt and calling the
* on_failure callback. Note that the connection can fail for various reasons before the
* timeout.
*
* @param returns ConnectionID that uniquely identifies the connection to this remote node. In
* order to talk to it you will need the returned value (or a copy of it).
*/
void connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
string_view pubkey = {}, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
ConnectionID connect_remote(string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
string_view pubkey = {},
AuthLevel auth_level = AuthLevel::none,
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
/**
* Disconnects an established outgoing connection established with `connect_remote()`.
* Disconnects an established outgoing connection established with `connect_remote()` (or, less
* commonly, `connect_sn()`).
*
* @param id the connection id, as returned by `connect_remote()`.
* @param id the connection id, as returned by `connect_remote()` or the SN pubkey.
*
* @param linger how long to allow the connection to linger while there are still pending
* outbound messages to it before disconnecting and dropping any pending messages. (Note that
* this lingering is internal; the disconnect_remote() call does not block). The default is 1
* second.
*
* If given a pubkey, we try to close an outgoing connection to the given SN if one exists; note
* however that this is often not particularly useful as messages to that SN can immediately
* reopen the connection.
*/
void disconnect_remote(string_view id, std::chrono::milliseconds linger = 1s);
void disconnect(ConnectionID id, std::chrono::milliseconds linger = 1s);
/**
* Queue a message to be relayed to the node identified with the given identifier (for SNs and
* incoming connections this is a pubkey; for connections established with `connect()` this will
* be the opaque string returned by `connect()`), without expecting a reply. LokiMQ will
* attempt to relay the message (first connecting and handshaking if not already connected
* and the given pubkey is a service node's pubkey).
* Queue a message to be relayed to the given service node or remote without requiring a reply.
* LokiMQ will attempt to relay the message (first connecting and handshaking to the remote SN
* if not already connected).
*
* If a new connection is established it will have a relatively short (30s) idle timeout. If
* the connection should stay open longer you should call `connect(pubkey, IDLETIME)` first.
* the connection should stay open longer you should either call `connect(pubkey, IDLETIME)` or
* pass a a `send_option::keep_alive{IDLETIME}` in `opts`.
*
* Note that this method (along with connect) doesn't block waiting for a connection or for the
* message to send; it merely instructs the proxy thread that it should send. ZMQ will
* generally try hard to deliver it (reconnecting if the connection fails), but if the
* connection fails persistently the message will eventually be dropped.
*
* @param id - the pubkey or identifier returned by `connect()` to send this to
* @param remote - either a ConnectionID value returned by connect_remote, or a service node
* pubkey string. In the latter case, sending the message may trigger a new
* connection being established to the service node (i.e. you do not have to
* call connect() first).
* @param cmd - the first data frame value which is almost always the remote "category.command" name
* @param opts - any number of std::string and send options. Each send option affects
* how the send works; each string becomes a serialized message part.
* @param opts - any number of std::string (or string_views) and send options. Each send option
* affects how the send works; each string becomes a message part.
*
* Example:
*
* lmq.send(pubkey, "hello", "abc", send_option::hint("tcp://localhost:1234"), "def");
* // Send to a SN, connecting to it if we aren't already connected:
* lmq.send(pubkey, "hello.world", "abc", send_option::hint("tcp://localhost:1234"), "def");
*
* sends the command `hello` to the given pubkey, containing additional message parts "abc" and
* "def", and, if not currently connected, using the given connection hint rather than
* performing a connection address lookup on the pubkey.
* // Start connecting to a remote and immediately queue a message for it
* auto conn = lmq.connect_remote("tcp://127.0.0.1:1234",
* [](ConnectionID) { std::cout << "connected\n"; },
* [](ConnectionID, string_view why) { std::cout << "connection failed: " << why << \n"; });
* lmq.send(conn, "hello.world", "abc", "def");
*
* Both of these send the command `hello.world` to the given pubkey, containing additional
* message parts "abc" and "def". In the first case, if not currently connected, the given
* connection hint may be used rather than performing a connection address lookup on the pubkey.
*/
template <typename... T>
void send(string_view pubkey, string_view cmd, const T&... opts);
void send(ConnectionID to, string_view cmd, const T&... opts);
/** Send a command configured as a "REQUEST" command: the data parts will be prefixed with a
* random identifier. The remote is expected to reply with a ["REPLY", <identifier>, ...]
* message, at which point we invoke the given callback with any [...] parts of the reply.
/** Send a command configured as a "REQUEST" command to a service node: the data parts will be
* prefixed with a random identifier. The remote is expected to reply with a ["REPLY",
* <identifier>, ...] message, at which point we invoke the given callback with any [...] parts
* of the reply.
*
* @param pubkey - the pubkey to send this request to
* Like `send()`, a new connection to the service node will be established if not already
* connected.
*
* @param to - the pubkey string or ConnectionID to send this request to
* @param cmd - the command name
* @param callback - the callback to invoke when we get a reply. Called with a true value and
* the data strings when a reply is received, or false and an empty vector of data parts if we
@ -862,7 +1020,7 @@ public:
* @param opts - anything else (i.e. strings, send_options) is forwarded to send().
*/
template <typename... T>
void request(string_view pubkey, string_view cmd, ReplyCallback callback, const T&... opts);
void request(ConnectionID to, string_view cmd, ReplyCallback callback, const T&... opts);
/// The key pair this LokiMQ was created with; if empty keys were given during construction then
/// this returns the generated keys.
@ -879,7 +1037,7 @@ public:
/**
* Queues a single job to be executed with no return value. This is a shortcut for creating and
* submitting a single-job, no-completion batch.
* submitting a single-job, no-completion-function batch job.
*/
void job(std::function<void()> f);
@ -928,7 +1086,15 @@ struct incoming {};
/// for the send and its current idle timeout setting is less than this value then it is updated.
struct keep_alive {
std::chrono::milliseconds time;
keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {}
explicit keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {}
};
/// Specifies a routing prefix to be used. This option is required (and added automatically by
/// Message::send and ::reply) when a message is being sent to a non-SN connection on a listening
/// socket, and has no effect otherwise.
struct route {
std::string routing_prefix;
explicit route(std::string r) : routing_prefix{std::move(r)} {}
};
}
@ -961,7 +1127,7 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option
control_data["optional"] = 1;
}
/// `incoming` specialization: sets the optional flag in the control data
/// `incoming` specialization: sets the incoming-only flag in the control data
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::incoming &) {
control_data["incoming"] = 1;
}
@ -971,19 +1137,28 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option
control_data["keep-alive"] = timeout.time.count();
}
/// `route` specialization: adds a routing prefix to be used when sending a non-SN message on an
/// incoming socket.
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::route& route) {
control_data["conn_route"] = route.routing_prefix;
}
} // namespace detail
template <typename... T>
void LokiMQ::send(string_view pubkey, string_view cmd, const T &...opts) {
void LokiMQ::send(ConnectionID to, string_view cmd, const T &...opts) {
bt_dict control_data;
bt_list parts{{cmd}};
bt_list parts{{std::move(cmd)}};
#ifdef __cpp_fold_expressions
(detail::apply_send_option(parts, control_data, opts),...);
#else
(void) std::initializer_list<int>{(detail::apply_send_option(parts, control_data, opts), 0)...};
#endif
control_data["pubkey"] = pubkey;
if (to.sn())
control_data["pubkey"] = std::move(to.pk);
else
control_data["conn_id"] = to.id;
control_data["send"] = std::move(parts);
detail::send_control(get_control_socket(), "SEND", bt_serialize(control_data));
}
@ -991,17 +1166,21 @@ void LokiMQ::send(string_view pubkey, string_view cmd, const T &...opts) {
std::string make_random_string(size_t size);
template <typename... T>
void LokiMQ::request(string_view pubkey, string_view cmd, ReplyCallback callback, const T &...opts) {
auto reply_tag = make_random_string(15); // 15 should keep us in most stl implementations' small string optimization
void LokiMQ::request(ConnectionID to, string_view cmd, ReplyCallback callback, const T &...opts) {
auto reply_tag = make_random_string(15); // 15 random bytes is lots and should keep us in most stl implementations' small string optimization
bt_dict control_data;
bt_list parts{{cmd, reply_tag}};
bt_list parts{{std::move(cmd), reply_tag}};
#ifdef __cpp_fold_expressions
(detail::apply_send_option(parts, control_data, opts),...);
#else
(void) std::initializer_list<int>{(detail::apply_send_option(parts, control_data, opts), 0)...};
#endif
control_data["pubkey"] = pubkey;
if (to.sn())
control_data["pubkey"] = std::move(to.pk);
else
control_data["conn_id"] = to.id;
control_data["send"] = std::move(parts);
control_data["request"] = true;
control_data["request_callback"] = reinterpret_cast<uintptr_t>(new ReplyCallback{std::move(callback)});
@ -1011,20 +1190,23 @@ void LokiMQ::request(string_view pubkey, string_view cmd, ReplyCallback callback
template <typename... Args>
void Message::send_back(string_view command, Args&&... args) {
assert(reply_tag.empty());
if (service_node) lokimq.send(pubkey, command, std::forward<Args>(args)...);
else lokimq.send(pubkey, command, send_option::optional{}, std::forward<Args>(args)...);
if (conn.sn()) lokimq.send(conn, command, std::forward<Args>(args)...);
else lokimq.send(conn, command, send_option::route{route}, send_option::optional{}, std::forward<Args>(args)...);
}
template <typename... Args>
void Message::send_reply(Args&&... args) {
assert(!reply_tag.empty());
if (service_node) lokimq.send(pubkey, "REPLY", reply_tag, std::forward<Args>(args)...);
else lokimq.send(pubkey, "REPLY", reply_tag, send_option::optional{}, std::forward<Args>(args)...);
if (conn.sn()) lokimq.send(conn, "REPLY", reply_tag, std::forward<Args>(args)...);
else lokimq.send(conn, "REPLY", reply_tag, send_option::route{route}, send_option::optional{}, std::forward<Args>(args)...);
}
template <typename ReplyCallback, typename... Args>
void Message::send_request(string_view cmd, ReplyCallback&& callback, Args&&... args) {
if (conn.sn()) lokimq.request(conn, cmd, std::forward<ReplyCallback>(callback), std::forward<Args>(args)...);
else lokimq.request(conn, cmd, std::forward<ReplyCallback>(callback),
send_option::route{route}, send_option::optional{}, std::forward<Args>(args)...);
}
template <typename... T>
void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) {
@ -1042,6 +1224,6 @@ void LokiMQ::log_(LogLevel lvl, const char* file, int line, const T&... stuff) {
std::ostream &operator<<(std::ostream &os, LogLevel lvl);
}
} // namespace lokimq
// vim:sw=4:et

View File

@ -44,9 +44,7 @@ TEST_CASE("batching many small jobs", "[batch-many]") {
lokimq::LokiMQ lmq{
"", "", // generate ephemeral keys
false, // not a service node
{}, // don't listen
[](auto) { return ""; },
[](auto ip, auto pk) { return lokimq::Allow{lokimq::AuthLevel::none, false}; },
};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);
@ -62,9 +60,7 @@ TEST_CASE("batch exception propagation", "[batch-exceptions]") {
lokimq::LokiMQ lmq{
"", "", // generate ephemeral keys
false, // not a service node
{}, // don't listen
[](auto) { return ""; },
[](auto ip, auto pk) { return lokimq::Allow{lokimq::AuthLevel::none, false}; },
};
lmq.set_general_threads(4);
lmq.set_batch_threads(4);

View File

@ -1,5 +1,6 @@
#include "common.h"
#include <future>
#include <lokimq/hex.h>
using namespace lokimq;
@ -8,12 +9,11 @@ TEST_CASE("basic commands", "[commands]") {
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
{listen},
[](auto &) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
[](auto) { return ""; },
get_logger("")
};
server.log_level(LogLevel::trace);
server.listen_curve(listen, [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; });
std::atomic<int> hellos{0}, his{0};
@ -25,7 +25,7 @@ TEST_CASE("basic commands", "[commands]") {
});
std::string client_pubkey;
server.add_command("public", "client.pubkey", [&](Message& m) {
client_pubkey = std::string{m.pubkey};
client_pubkey = std::string{m.conn.pubkey()};
});
server.start();
@ -42,9 +42,9 @@ TEST_CASE("basic commands", "[commands]") {
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
client.connect_remote(listen,
[&](std::string pk) { pubkey = std::move(pk); connected = true; },
[&](string_view) { failed = true; },
auto c = client.connect_remote(listen,
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
[&](auto conn, string_view) { failed = true; },
server.get_pubkey());
int i;
@ -56,18 +56,18 @@ TEST_CASE("basic commands", "[commands]") {
REQUIRE( connected.load() );
REQUIRE( i <= 1 ); // should be fast
REQUIRE( !failed.load() );
REQUIRE( pubkey == server.get_pubkey() );
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
client.send(pubkey, "public.hello");
client.send(pubkey, "public.client.pubkey");
client.send(c, "public.hello");
client.send(c, "public.client.pubkey");
std::this_thread::sleep_for(50ms);
REQUIRE( hellos == 1 );
REQUIRE( his == 1 );
REQUIRE( client_pubkey == client.get_pubkey() );
REQUIRE( to_hex(client_pubkey) == to_hex(client.get_pubkey()) );
for (int i = 0; i < 50; i++)
client.send(pubkey, "public.hello");
client.send(c, "public.hello");
std::this_thread::sleep_for(100ms);
REQUIRE( hellos == 51 );

View File

@ -5,18 +5,17 @@ extern "C" {
}
TEST_CASE("connections", "[connect][curve]") {
TEST_CASE("connections with curve authentication", "[curve][connect]") {
std::string listen = "tcp://127.0.0.1:4455";
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
{listen},
[](auto) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
get_logger("")
};
server.log_level(LogLevel::trace);
server.listen_curve(listen, [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; });
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); });
server.start();
@ -28,9 +27,9 @@ TEST_CASE("connections", "[connect][curve]") {
auto pubkey = server.get_pubkey();
std::atomic<int> connected{0};
client.connect_remote(listen,
[&](std::string pk) { connected = 1; },
[&](string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); },
auto server_conn = client.connect_remote(listen,
[&](auto conn) { connected = 1; },
[&](auto conn, string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); },
pubkey);
int i;
@ -44,7 +43,7 @@ TEST_CASE("connections", "[connect][curve]") {
bool success = false;
std::vector<std::string> parts;
client.request(pubkey, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; });
client.request(server_conn, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; });
std::this_thread::sleep_for(50ms);
REQUIRE( success );
@ -58,19 +57,18 @@ TEST_CASE("self-connection SN optimization", "[connect][self]") {
LokiMQ sn{
pubkey, privkey,
true,
{"tcp://127.0.0.1:5544"},
[&](auto pk) { if (pk == pubkey) return "tcp://127.0.0.1:5544"; else return ""; },
[&](auto ip, auto pk) { REQUIRE(ip == "127.0.0.1"); return Allow{AuthLevel::none, pk == pubkey}; },
get_logger("")
};
sn.listen_curve("tcp://127.0.0.1:5544", [&](auto ip, auto pk) { REQUIRE(ip == "127.0.0.1"); return Allow{AuthLevel::none, pk == pubkey}; });
sn.add_category("a", Access{AuthLevel::none});
bool invoked = false;
sn.add_command("a", "b", [&](const Message& m) {
invoked = true;
auto lock = catch_lock();
REQUIRE(m.pubkey == pubkey);
REQUIRE(m.service_node);
REQUIRE(m.conn.sn());
REQUIRE(m.conn.pubkey() == pubkey);
REQUIRE(!m.data.empty());
REQUIRE(m.data[0] == "my data");
});
@ -82,3 +80,44 @@ TEST_CASE("self-connection SN optimization", "[connect][self]") {
std::this_thread::sleep_for(50ms);
REQUIRE(invoked);
}
TEST_CASE("plain-text connections", "[plaintext][connect]") {
std::string listen = "tcp://127.0.0.1:4455";
LokiMQ server{get_logger("")};
server.log_level(LogLevel::trace);
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "hello", [&](Message& m) { m.send_reply("hi"); });
server.listen_plain(listen, [](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; });
server.start();
LokiMQ client{get_logger("")};
client.log_level(LogLevel::trace);
client.start();
std::atomic<int> connected{0};
auto c = client.connect_remote(listen,
[&](auto conn) { connected = 1; },
[&](auto conn, string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
);
int i;
for (i = 0; i < 5; i++) {
if (connected.load())
break;
std::this_thread::sleep_for(50ms);
}
REQUIRE( i <= 1 );
REQUIRE( connected.load() );
bool success = false;
std::vector<std::string> parts;
client.request(c, "public.hello", [&](auto success_, auto parts_) { success = success_; parts = parts_; });
std::this_thread::sleep_for(50ms);
REQUIRE( success );
}

View File

@ -1,5 +1,6 @@
#include "common.h"
#include <future>
#include <lokimq/hex.h>
using namespace lokimq;
@ -8,10 +9,9 @@ TEST_CASE("basic requests", "[requests]") {
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
{listen},
[](auto &) { return ""; },
[](auto /*ip*/, auto /*pk*/) { return Allow{AuthLevel::none, false}; },
[](auto) { return ""; },
};
server.listen_curve(listen);
std::atomic<int> hellos{0}, his{0};
@ -31,9 +31,9 @@ TEST_CASE("basic requests", "[requests]") {
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
client.connect_remote(listen,
[&](std::string pk) { pubkey = std::move(pk); connected = true; },
[&](string_view) { failed = true; },
auto c = client.connect_remote(listen,
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
[&](auto, auto) { failed = true; },
server.get_pubkey());
int i;
@ -45,12 +45,70 @@ TEST_CASE("basic requests", "[requests]") {
REQUIRE( connected.load() );
REQUIRE( !failed.load() );
REQUIRE( i <= 1 );
REQUIRE( pubkey == server.get_pubkey() );
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
std::atomic<bool> got_reply{false};
bool success;
std::vector<std::string> data;
client.request(pubkey, "public.hello", [&](bool ok, std::vector<std::string> data_) {
client.request(c, "public.hello", [&](bool ok, std::vector<std::string> data_) {
got_reply = true;
success = ok;
data = std::move(data_);
});
std::this_thread::sleep_for(50ms);
REQUIRE( got_reply.load() );
REQUIRE( success );
REQUIRE( data == std::vector<std::string>{{"123"}} );
}
TEST_CASE("request from server to client", "[requests]") {
std::string listen = "tcp://127.0.0.1:5678";
LokiMQ server{
"", "", // generate ephemeral keys
false, // not a service node
[](auto) { return ""; },
};
server.listen_curve(listen);
std::atomic<int> hellos{0}, his{0};
server.add_category("public", Access{AuthLevel::none});
server.add_request_command("public", "hello", [&](Message& m) {
m.send_reply("123");
});
server.start();
LokiMQ client(
[](LogLevel, const char* file, int line, std::string msg) { std::cerr << file << ":" << line << " --C-- " << msg << "\n"; }
);
//client.log_level(LogLevel::trace);
client.start();
std::atomic<bool> connected{false}, failed{false};
std::string pubkey;
auto c = client.connect_remote(listen,
[&](auto conn) { pubkey = conn.pubkey(); connected = true; },
[&](auto, auto) { failed = true; },
server.get_pubkey());
int i;
for (i = 0; i < 5; i++) {
if (connected.load())
break;
std::this_thread::sleep_for(50ms);
}
REQUIRE( connected.load() );
REQUIRE( !failed.load() );
REQUIRE( i <= 1 );
REQUIRE( to_hex(pubkey) == to_hex(server.get_pubkey()) );
std::atomic<bool> got_reply{false};
bool success;
std::vector<std::string> data;
client.request(c, "public.hello", [&](bool ok, std::vector<std::string> data_) {
got_reply = true;
success = ok;
data = std::move(data_);