mirror of
https://github.com/oxen-io/oxen-mq.git
synced 2023-12-13 21:00:31 +01:00
commit
51754037ea
|
@ -16,15 +16,13 @@ cmake_minimum_required(VERSION 3.7)
|
||||||
# Has to be set before `project()`, and ignored on non-macos:
|
# Has to be set before `project()`, and ignored on non-macos:
|
||||||
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
|
set(CMAKE_OSX_DEPLOYMENT_TARGET 10.12 CACHE STRING "macOS deployment target (Apple clang only)")
|
||||||
|
|
||||||
project(liboxenmq CXX C)
|
project(liboxenmq
|
||||||
|
VERSION 1.2.5
|
||||||
|
LANGUAGES CXX C)
|
||||||
|
|
||||||
include(GNUInstallDirs)
|
include(GNUInstallDirs)
|
||||||
|
|
||||||
set(OXENMQ_VERSION_MAJOR 1)
|
message(STATUS "oxenmq v${PROJECT_VERSION}")
|
||||||
set(OXENMQ_VERSION_MINOR 2)
|
|
||||||
set(OXENMQ_VERSION_PATCH 4)
|
|
||||||
set(OXENMQ_VERSION "${OXENMQ_VERSION_MAJOR}.${OXENMQ_VERSION_MINOR}.${OXENMQ_VERSION_PATCH}")
|
|
||||||
message(STATUS "oxenmq v${OXENMQ_VERSION}")
|
|
||||||
|
|
||||||
set(OXENMQ_LIBVERSION 0)
|
set(OXENMQ_LIBVERSION 0)
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
|
||||||
|
|
||||||
Name: liblokimq
|
Name: liblokimq
|
||||||
Description: ZeroMQ-based communication library (compatibility package for liboxenmq)
|
Description: ZeroMQ-based communication library (compatibility package for liboxenmq)
|
||||||
Version: @OXENMQ_VERSION@
|
Version: @PROJECT_VERSION@
|
||||||
|
|
||||||
Libs: -L${libdir} -loxenmq
|
Libs: -L${libdir} -loxenmq
|
||||||
Libs.private: @PRIVATE_LIBS@
|
Libs.private: @PRIVATE_LIBS@
|
||||||
|
|
|
@ -5,7 +5,7 @@ includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@
|
||||||
|
|
||||||
Name: liboxenmq
|
Name: liboxenmq
|
||||||
Description: ZeroMQ-based communication library
|
Description: ZeroMQ-based communication library
|
||||||
Version: @OXENMQ_VERSION@
|
Version: @PROJECT_VERSION@
|
||||||
|
|
||||||
Libs: -L${libdir} -loxenmq
|
Libs: -L${libdir} -loxenmq
|
||||||
Libs.private: @PRIVATE_LIBS@
|
Libs.private: @PRIVATE_LIBS@
|
||||||
|
|
|
@ -47,7 +47,7 @@ void OxenMQ::setup_external_socket(zmq::socket_t& socket) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey) {
|
void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_ephemeral_routing_id) {
|
||||||
|
|
||||||
setup_external_socket(socket);
|
setup_external_socket(socket);
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
|
||||||
socket.set(zmq::sockopt::curve_secretkey, privkey);
|
socket.set(zmq::sockopt::curve_secretkey, privkey);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PUBKEY_BASED_ROUTING_ID) {
|
if (!use_ephemeral_routing_id) {
|
||||||
std::string routing_id;
|
std::string routing_id;
|
||||||
routing_id.reserve(33);
|
routing_id.reserve(33);
|
||||||
routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0)
|
routing_id += 'L'; // Prefix because routing id's starting with \0 are reserved by zmq (and our pubkey might start with \0)
|
||||||
|
@ -67,39 +67,18 @@ void OxenMQ::setup_outgoing_socket(zmq::socket_t& socket, std::string_view remot
|
||||||
// else let ZMQ pick a random one
|
// else let ZMQ pick a random one
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionID OxenMQ::connect_sn(std::string_view pubkey, std::chrono::milliseconds keep_alive, std::string_view hint) {
|
// Deprecated versions:
|
||||||
if (!proxy_thread.joinable())
|
ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect,
|
||||||
throw std::logic_error("Cannot call connect_sn() before calling `start()`");
|
ConnectFailure on_failure, AuthLevel auth_level, std::chrono::milliseconds timeout) {
|
||||||
|
return connect_remote(address{remote}, std::move(on_connect), std::move(on_failure),
|
||||||
detail::send_control(get_control_socket(), "CONNECT_SN", bt_serialize<bt_dict>({{"pubkey",pubkey}, {"keep_alive",keep_alive.count()}, {"hint",hint}}));
|
auth_level, connect_option::timeout{timeout});
|
||||||
|
|
||||||
return pubkey;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionID OxenMQ::connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect,
|
||||||
AuthLevel auth_level, std::chrono::milliseconds timeout) {
|
ConnectFailure on_failure, std::string_view pubkey, AuthLevel auth_level,
|
||||||
if (!proxy_thread.joinable())
|
std::chrono::milliseconds timeout) {
|
||||||
throw std::logic_error("Cannot call connect_remote() before calling `start()`");
|
return connect_remote(address{remote}.set_pubkey(pubkey), std::move(on_connect),
|
||||||
|
std::move(on_failure), auth_level, connect_option::timeout{timeout});
|
||||||
auto id = next_conn_id++;
|
|
||||||
LMQ_TRACE("telling proxy to connect to ", remote, ", id ", id);
|
|
||||||
detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize<bt_dict>({
|
|
||||||
{"auth_level", static_cast<std::underlying_type_t<AuthLevel>>(auth_level)},
|
|
||||||
{"conn_id", id},
|
|
||||||
{"connect", detail::serialize_object(std::move(on_connect))},
|
|
||||||
{"failure", detail::serialize_object(std::move(on_failure))},
|
|
||||||
{"pubkey", remote.curve() ? remote.pubkey : ""},
|
|
||||||
{"remote", remote.zmq_address()},
|
|
||||||
{"timeout", timeout.count()},
|
|
||||||
}));
|
|
||||||
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
ConnectionID OxenMQ::connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
|
||||||
std::string_view pubkey, AuthLevel auth_level, std::chrono::milliseconds timeout) {
|
|
||||||
return connect_remote(address{remote}.set_pubkey(pubkey),
|
|
||||||
std::move(on_connect), std::move(on_failure), auth_level, timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
|
void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
|
||||||
|
@ -111,7 +90,7 @@ void OxenMQ::disconnect(ConnectionID id, std::chrono::milliseconds linger) {
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<zmq::socket_t *, std::string>
|
std::pair<zmq::socket_t *, std::string>
|
||||||
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive) {
|
OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, bool use_ephemeral_routing_id, std::chrono::milliseconds keep_alive) {
|
||||||
ConnectionID remote_cid{remote};
|
ConnectionID remote_cid{remote};
|
||||||
auto its = peers.equal_range(remote_cid);
|
auto its = peers.equal_range(remote_cid);
|
||||||
peer_info* peer = nullptr;
|
peer_info* peer = nullptr;
|
||||||
|
@ -163,7 +142,7 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
|
||||||
|
|
||||||
LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote));
|
LMQ_LOG(debug, to_hex(pubkey), " (me) connecting to ", addr, " to reach ", to_hex(remote));
|
||||||
zmq::socket_t socket{context, zmq::socket_type::dealer};
|
zmq::socket_t socket{context, zmq::socket_type::dealer};
|
||||||
setup_outgoing_socket(socket, remote);
|
setup_outgoing_socket(socket, remote, use_ephemeral_routing_id);
|
||||||
try {
|
try {
|
||||||
socket.connect(addr);
|
socket.connect(addr);
|
||||||
} catch (const zmq::error_t& e) {
|
} catch (const zmq::error_t& e) {
|
||||||
|
@ -189,9 +168,11 @@ OxenMQ::proxy_connect_sn(std::string_view remote, std::string_view connect_hint,
|
||||||
std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consumer data) {
|
std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consumer data) {
|
||||||
std::string_view hint, remote_pk;
|
std::string_view hint, remote_pk;
|
||||||
std::chrono::milliseconds keep_alive;
|
std::chrono::milliseconds keep_alive;
|
||||||
bool optional = false, incoming_only = false, outgoing_only = false;
|
bool optional = false, incoming_only = false, outgoing_only = false, ephemeral_rid = EPHEMERAL_ROUTING_ID;
|
||||||
|
|
||||||
// Alphabetical order
|
// Alphabetical order
|
||||||
|
if (data.skip_until("ephemeral_rid"))
|
||||||
|
ephemeral_rid = data.consume_integer<bool>();
|
||||||
if (data.skip_until("hint"))
|
if (data.skip_until("hint"))
|
||||||
hint = data.consume_string_view();
|
hint = data.consume_string_view();
|
||||||
if (data.skip_until("incoming"))
|
if (data.skip_until("incoming"))
|
||||||
|
@ -206,7 +187,7 @@ std::pair<zmq::socket_t *, std::string> OxenMQ::proxy_connect_sn(bt_dict_consume
|
||||||
throw std::runtime_error("Internal error: Invalid proxy_connect_sn command; pubkey missing");
|
throw std::runtime_error("Internal error: Invalid proxy_connect_sn command; pubkey missing");
|
||||||
remote_pk = data.consume_string_view();
|
remote_pk = data.consume_string_view();
|
||||||
|
|
||||||
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, keep_alive);
|
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, ephemeral_rid, keep_alive);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Container, typename AccessIndex>
|
template <typename Container, typename AccessIndex>
|
||||||
|
@ -315,17 +296,18 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
|
||||||
std::string remote;
|
std::string remote;
|
||||||
std::string remote_pubkey;
|
std::string remote_pubkey;
|
||||||
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT;
|
std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT;
|
||||||
|
bool ephemeral_rid = EPHEMERAL_ROUTING_ID;
|
||||||
|
|
||||||
if (data.skip_until("auth_level"))
|
if (data.skip_until("auth_level"))
|
||||||
auth_level = static_cast<AuthLevel>(data.consume_integer<std::underlying_type_t<AuthLevel>>());
|
auth_level = static_cast<AuthLevel>(data.consume_integer<std::underlying_type_t<AuthLevel>>());
|
||||||
if (data.skip_until("conn_id"))
|
if (data.skip_until("conn_id"))
|
||||||
conn_id = data.consume_integer<long long>();
|
conn_id = data.consume_integer<long long>();
|
||||||
if (data.skip_until("connect")) {
|
if (data.skip_until("connect"))
|
||||||
on_connect = detail::deserialize_object<ConnectSuccess>(data.consume_integer<uintptr_t>());
|
on_connect = detail::deserialize_object<ConnectSuccess>(data.consume_integer<uintptr_t>());
|
||||||
}
|
if (data.skip_until("ephemeral_rid"))
|
||||||
if (data.skip_until("failure")) {
|
ephemeral_rid = data.consume_integer<bool>();
|
||||||
|
if (data.skip_until("failure"))
|
||||||
on_failure = detail::deserialize_object<ConnectFailure>(data.consume_integer<uintptr_t>());
|
on_failure = detail::deserialize_object<ConnectFailure>(data.consume_integer<uintptr_t>());
|
||||||
}
|
|
||||||
if (data.skip_until("pubkey")) {
|
if (data.skip_until("pubkey")) {
|
||||||
remote_pubkey = data.consume_string();
|
remote_pubkey = data.consume_string();
|
||||||
assert(remote_pubkey.size() == 32 || remote_pubkey.empty());
|
assert(remote_pubkey.size() == 32 || remote_pubkey.empty());
|
||||||
|
@ -344,7 +326,7 @@ void OxenMQ::proxy_connect_remote(bt_dict_consumer data) {
|
||||||
|
|
||||||
zmq::socket_t sock{context, zmq::socket_type::dealer};
|
zmq::socket_t sock{context, zmq::socket_type::dealer};
|
||||||
try {
|
try {
|
||||||
setup_outgoing_socket(sock, remote_pubkey);
|
setup_outgoing_socket(sock, remote_pubkey, ephemeral_rid);
|
||||||
sock.connect(remote);
|
sock.connect(remote);
|
||||||
} catch (const zmq::error_t &e) {
|
} catch (const zmq::error_t &e) {
|
||||||
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
|
proxy_schedule_reply_job([conn_id, on_failure=std::move(on_failure), what="connect() failed: "s+e.what()] {
|
||||||
|
|
207
oxenmq/oxenmq.h
207
oxenmq/oxenmq.h
|
@ -74,6 +74,11 @@ template <typename R> class Batch;
|
||||||
*/
|
*/
|
||||||
inline constexpr auto DEFAULT_SEND_KEEP_ALIVE = 30s;
|
inline constexpr auto DEFAULT_SEND_KEEP_ALIVE = 30s;
|
||||||
|
|
||||||
|
/** Default keep-alive time for a connect_sn() (unless overridden via a connect_option::keep_alive
|
||||||
|
* argument).
|
||||||
|
*/
|
||||||
|
inline constexpr auto DEFAULT_CONNECT_SN_KEEP_ALIVE = 5min;
|
||||||
|
|
||||||
// The default timeout for connect_remote()
|
// The default timeout for connect_remote()
|
||||||
inline constexpr auto REMOTE_CONNECT_TIMEOUT = 10s;
|
inline constexpr auto REMOTE_CONNECT_TIMEOUT = 10s;
|
||||||
|
|
||||||
|
@ -193,15 +198,21 @@ public:
|
||||||
* closing the connection. Setting this only affects new outgoing connections. */
|
* closing the connection. Setting this only affects new outgoing connections. */
|
||||||
std::chrono::milliseconds HANDSHAKE_TIME = 10s;
|
std::chrono::milliseconds HANDSHAKE_TIME = 10s;
|
||||||
|
|
||||||
/** Whether to use a zmq routing ID based on the pubkey for new outgoing connections. This is
|
/** Whether to use a random zmq routing ID, or one based on the pubkey for new outgoing
|
||||||
* normally desirable as it allows the listener to recognize that the incoming connection is a
|
* connections. Using the pubkey is desirable when connections between endpoints are unique as
|
||||||
* reconnection from the same remote and handover routing to the new socket while closing off
|
* it allows the listener to recognize that the incoming connection is a reconnection from the
|
||||||
* the (likely dead) old socket. This, however, prevents a single OxenMQ instance from
|
* same remote and handover routing to the new socket while closing off the (likely dead) old
|
||||||
* establishing multiple connections to the same listening OxenMQ, which is sometimes useful
|
* socket. This, however, prevents a single OxenMQ instance (or multiple OxenMQ instances using
|
||||||
* (for example when testing), and so this option can be overridden to `false` to use completely
|
* the same keys) from establishing multiple connections to the same listening OxenMQ, which is
|
||||||
* random zmq routing ids on outgoing connections (which will thus allow multiple connections).
|
* sometimes useful (for example when testing, or when sharing an authentication key), and so
|
||||||
|
* this option can be overridden to `true` to use completely random zmq routing ids on outgoing
|
||||||
|
* connections (which will thus allow multiple connections).
|
||||||
|
*
|
||||||
|
* Note that this only affects the default for outgoing connections: you can override an
|
||||||
|
* individual connection by passing a connect_option::ephemeral_routing_id option into the
|
||||||
|
* connect_sn/connect_remote method.
|
||||||
*/
|
*/
|
||||||
bool PUBKEY_BASED_ROUTING_ID = true;
|
bool EPHEMERAL_ROUTING_ID = false;
|
||||||
|
|
||||||
/** Maximum incoming message size; if a remote tries sending a message larger than this they get
|
/** Maximum incoming message size; if a remote tries sending a message larger than this they get
|
||||||
* disconnected. -1 means no limit. */
|
* disconnected. -1 means no limit. */
|
||||||
|
@ -487,7 +498,7 @@ private:
|
||||||
// provided then the connection will be curve25519 encrypted and authenticate; otherwise it will
|
// provided then the connection will be curve25519 encrypted and authenticate; otherwise it will
|
||||||
// be unencrypted and unauthenticated. Note that the remote end must be in the same mode (i.e.
|
// be unencrypted and unauthenticated. Note that the remote end must be in the same mode (i.e.
|
||||||
// either accepting curve connections, or not accepting curve).
|
// either accepting curve connections, or not accepting curve).
|
||||||
void setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey = {});
|
void setup_outgoing_socket(zmq::socket_t& socket, std::string_view remote_pubkey, bool use_ephemeral_routing_id);
|
||||||
|
|
||||||
/// Common connection implementation used by proxy_connect/proxy_send. Returns the socket and,
|
/// Common connection implementation used by proxy_connect/proxy_send. Returns the socket and,
|
||||||
/// if a routing prefix is needed, the required prefix (or an empty string if not needed). For
|
/// if a routing prefix is needed, the required prefix (or an empty string if not needed). For
|
||||||
|
@ -503,8 +514,10 @@ private:
|
||||||
/// @param keep_alive the keep alive for the connection, if we establish a new outgoing
|
/// @param keep_alive the keep alive for the connection, if we establish a new outgoing
|
||||||
/// connection. If we already have an outgoing connection then its keep-alive gets increased to
|
/// connection. If we already have an outgoing connection then its keep-alive gets increased to
|
||||||
/// this if currently less than this.
|
/// this if currently less than this.
|
||||||
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(std::string_view pubkey, std::string_view connect_hint,
|
/// @param ephemeral_routing_id whether or not to use a random (true) or pubkey-based (false) routing id
|
||||||
bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive);
|
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(std::string_view pubkey,
|
||||||
|
std::string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only,
|
||||||
|
bool ephemeral_routing_id, std::chrono::milliseconds keep_alive);
|
||||||
|
|
||||||
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could
|
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could
|
||||||
/// be existing or a new one). This basically just unpacks arguments and passes them on to
|
/// be existing or a new one). This basically just unpacks arguments and passes them on to
|
||||||
|
@ -974,21 +987,23 @@ public:
|
||||||
* instructs the proxy thread that it should establish a connection.
|
* instructs the proxy thread that it should establish a connection.
|
||||||
*
|
*
|
||||||
* @param pubkey - the public key (32-byte binary string) of the service node to connect to
|
* @param pubkey - the public key (32-byte binary string) of the service node to connect to
|
||||||
* @param keep_alive - the connection will be kept alive if there was valid activity within
|
* @param options - connection options; see the structs in `connect_option`, in particular:
|
||||||
* the past `keep_alive` milliseconds. If an outgoing connection already
|
* - keep_alive -- how long the SN connection will be kept alive after valid activity
|
||||||
* exists, the longer of the existing and the given keep alive is used.
|
* - remote_hint -- a remote address hint that may be used instead of doing a lookup
|
||||||
* (Note that the default applied here is much longer than the default for an
|
* - ephemeral_routing_id -- allows you to override the EPHEMERAL_ROUTING_ID option for
|
||||||
* implicit connect() by calling send() directly.)
|
* this connection.
|
||||||
* @param hint - if non-empty and a new outgoing connection needs to be made this hint value
|
*
|
||||||
* may be used instead of calling the lookup function. (Note that there is no
|
* For backwards compatibility you may also directly pass (as a `options` value):
|
||||||
* guarantee that the hint will be used; it is only usefully specified if the
|
* - a std::chrono::duration duration (equivalent to connect_option::keep_alive{duration})
|
||||||
* connection address has already been incidentally determined).
|
* - a string or string_view hint (equivalent to connect_option::hint{hint})
|
||||||
|
* but these should be considered deprecated and the connection_option versions preferred.
|
||||||
*
|
*
|
||||||
* @returns a ConnectionID that identifies an connection with the given SN. Typically you
|
* @returns a ConnectionID that identifies an connection with the given SN. Typically you
|
||||||
* *don't* need to worry about this (and can just discard it): you can always simply pass the
|
* *don't* need to worry about this (and can just discard it): you can always simply pass the
|
||||||
* pubkey as a string wherever a ConnectionID is called.
|
* pubkey as a string wherever a ConnectionID is called.
|
||||||
*/
|
*/
|
||||||
ConnectionID connect_sn(std::string_view pubkey, std::chrono::milliseconds keep_alive = 5min, std::string_view hint = {});
|
template <typename... Option>
|
||||||
|
ConnectionID connect_sn(std::string_view pubkey, const Option&... opts);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Establish a connection to the given remote with callbacks invoked on a successful or failed
|
* Establish a connection to the given remote with callbacks invoked on a successful or failed
|
||||||
|
@ -1010,24 +1025,27 @@ public:
|
||||||
* address and whether curve encryption should be used.
|
* address and whether curve encryption should be used.
|
||||||
* @param on_connect called with the identifier after the connection has been established.
|
* @param on_connect called with the identifier after the connection has been established.
|
||||||
* @param on_failure called with the identifier and failure message if we fail to connect.
|
* @param on_failure called with the identifier and failure message if we fail to connect.
|
||||||
* @param auth_level determines the authentication level of the remote for issuing commands to
|
* @param options supports various connection options:
|
||||||
* us. The default is `AuthLevel::none`.
|
* - passing an AuthLevel here sets the auth_level for incoming messages on this
|
||||||
* @param timeout how long to try before aborting the connection attempt and calling the
|
* connection (instead of AuthLevel::none).
|
||||||
* on_failure callback. Note that the connection can fail for various reasons before the
|
* - anything else should be one of the `oxenmq::connect_option` structs.
|
||||||
* timeout.
|
* - passing a std::chrono::duration type is permitted (but deprecated) for backwards
|
||||||
|
* compatibility; it is equivalent to `connection_option::timeout{duration}`.
|
||||||
*
|
*
|
||||||
* @param returns ConnectionID that uniquely identifies the connection to this remote node. In
|
* @returns ConnectionID that uniquely identifies the connection to this remote node. In order
|
||||||
* order to talk to it you will need the returned value (or a copy of it).
|
* to talk to it you will need the returned value (or a copy of it).
|
||||||
*/
|
*/
|
||||||
|
template <typename... Option>
|
||||||
ConnectionID connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
ConnectionID connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||||
AuthLevel auth_level = AuthLevel::none, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
|
const Option&... options);
|
||||||
|
|
||||||
/// Same as the above, but takes the address as a string_view and constructs an `address` from
|
/// Deprecated connect_remote variants that take the address as a string view. The second
|
||||||
/// it.
|
/// version also takes a pubkey (for a secure connection) as a separate argument. Use of these
|
||||||
|
/// is deprecated and discouraged: use an address with connect_option::whatever arguments
|
||||||
|
/// instead.
|
||||||
|
[[deprecated("use connect_remote() with a oxenmq::address instead")]]
|
||||||
ConnectionID connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
ConnectionID connect_remote(std::string_view remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||||
AuthLevel auth_level = AuthLevel::none, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT) {
|
AuthLevel auth_level = AuthLevel::none, std::chrono::milliseconds timeout = REMOTE_CONNECT_TIMEOUT);
|
||||||
return connect_remote(address{remote}, std::move(on_connect), std::move(on_failure), auth_level, timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Deprecated version of the above that takes the remote address and remote pubkey for curve
|
/// Deprecated version of the above that takes the remote address and remote pubkey for curve
|
||||||
/// encryption as separate arguments. New code should either use a pubkey-embedded address
|
/// encryption as separate arguments. New code should either use a pubkey-embedded address
|
||||||
|
@ -1076,7 +1094,9 @@ public:
|
||||||
* call connect() first).
|
* call connect() first).
|
||||||
* @param cmd - the first data frame value which is almost always the remote "category.command" name
|
* @param cmd - the first data frame value which is almost always the remote "category.command" name
|
||||||
* @param opts - any number of std::string (or string_views) and send options. Each send option
|
* @param opts - any number of std::string (or string_views) and send options. Each send option
|
||||||
* affects how the send works; each string becomes a message part.
|
* affects how the send works; each string becomes a message part. May also
|
||||||
|
* contain std::optional<T> values: the value will be applied as a string or send
|
||||||
|
* option if set and skipped if null.
|
||||||
*
|
*
|
||||||
* Example:
|
* Example:
|
||||||
*
|
*
|
||||||
|
@ -1365,6 +1385,53 @@ struct queue_full {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Namespace for options to the connect_remote()/connect_sn() methods
|
||||||
|
namespace connect_option {
|
||||||
|
|
||||||
|
/// Specifies whether the connection should use pubkey-based routing for this connection, overriding
|
||||||
|
/// the default (OxenMQ::EPHEMERAL_ROUTING_ID). See OxenMQ::EPHEMERAL_ROUTING_ID for a description
|
||||||
|
/// of this.
|
||||||
|
///
|
||||||
|
/// Typically use: `connect_options::ephemeral_routing_id{}` or `connect_options::ephemeral_routing_id{false}`.
|
||||||
|
struct ephemeral_routing_id {
|
||||||
|
bool use_ephemeral_routing_id = true;
|
||||||
|
// Constructor; default construction gives you pubkey routing, but the bool parameter can be
|
||||||
|
// specified as false to explicitly disable the pubkey routing flag.
|
||||||
|
explicit ephemeral_routing_id(bool use = true) : use_ephemeral_routing_id{use} {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Sets the connection timeout (instead of the default REMOTE_CONNECT_TIMEOUT). Only applies to
|
||||||
|
/// connect_remote(), not connect_sn().
|
||||||
|
struct timeout {
|
||||||
|
std::chrono::milliseconds time;
|
||||||
|
explicit timeout(std::chrono::milliseconds time) : time{std::move(time)} {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Sets the connection keep-alive (only applies to connect_sn(), not connect_remote()). The
|
||||||
|
/// connection will be kept alive if there was valid activity within the past `keep_alive`
|
||||||
|
/// milliseconds. If an outgoing connection already exists, the longer of the existing and the
|
||||||
|
/// given keep alive is used.
|
||||||
|
///
|
||||||
|
/// Note that, if not specified, the default keep-alive for a connection established via
|
||||||
|
/// connect_sn() is 5 minutes (which is much longer than the default for an implicit connect() by
|
||||||
|
/// calling send() directly with a pubkey.)
|
||||||
|
struct keep_alive {
|
||||||
|
std::chrono::milliseconds time;
|
||||||
|
explicit keep_alive(std::chrono::milliseconds time) : time{std::move(time)} {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Sets a remote address hint for an outgoing SN connection (only applies to connect_sn()). If a
|
||||||
|
/// new outgoing connection needs to be made this hint value may be used instead of calling the
|
||||||
|
/// lookup function. (Note that there is no guarantee that the hint will be used; it is only
|
||||||
|
/// usefully specified if the connection address has already been incidentally determined to save a
|
||||||
|
/// potentially expensive lookup call).
|
||||||
|
struct hint {
|
||||||
|
std::string address;
|
||||||
|
explicit hint(std::string_view address) : address{address} {}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
/// Takes an rvalue reference, moves it into a new instance then returns a uintptr_t value
|
/// Takes an rvalue reference, moves it into a new instance then returns a uintptr_t value
|
||||||
|
@ -1397,6 +1464,13 @@ inline void apply_send_option(bt_list& parts, bt_dict&, std::string_view arg) {
|
||||||
parts.emplace_back(arg);
|
parts.emplace_back(arg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// std::optional<T>: if the optional is set, we unwrap it and apply as a send_option, otherwise we
|
||||||
|
/// ignore it.
|
||||||
|
template <typename T>
|
||||||
|
inline void apply_send_option(bt_list& parts, bt_dict& control_data, const std::optional<T>& opt) {
|
||||||
|
if (opt) apply_send_option(parts, control_data, *opt);
|
||||||
|
}
|
||||||
|
|
||||||
/// `data_parts` specialization: appends a range of serialized data parts to the parts to send
|
/// `data_parts` specialization: appends a range of serialized data parts to the parts to send
|
||||||
template <typename InputIt>
|
template <typename InputIt>
|
||||||
void apply_send_option(bt_list& parts, bt_dict&, const send_option::data_parts_impl<InputIt> data) {
|
void apply_send_option(bt_list& parts, bt_dict&, const send_option::data_parts_impl<InputIt> data) {
|
||||||
|
@ -1464,8 +1538,71 @@ bt_dict build_send(ConnectionID to, std::string_view cmd, T&&... opts) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, const AuthLevel& auth) {
|
||||||
|
if (remote) opts["auth_level"] = static_cast<std::underlying_type_t<AuthLevel>>(auth);
|
||||||
|
else omq.log(LogLevel::warn, __FILE__, __LINE__, "AuthLevel ignored for connect_sn(...)");
|
||||||
|
}
|
||||||
|
inline void apply_connect_option(OxenMQ&, bool, bt_dict& opts, const connect_option::ephemeral_routing_id& er) {
|
||||||
|
opts["ephemeral_rid"] = er.use_ephemeral_routing_id;
|
||||||
|
}
|
||||||
|
inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, const connect_option::timeout& timeout) {
|
||||||
|
if (remote) opts["timeout"] = timeout.time.count();
|
||||||
|
else omq.log(LogLevel::warn, __FILE__, __LINE__, "connect_option::timeout ignored for connect_sn(...)");
|
||||||
|
}
|
||||||
|
inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, const connect_option::keep_alive& ka) {
|
||||||
|
if (!remote) opts["keep_alive"] = ka.time.count();
|
||||||
|
else omq.log(LogLevel::warn, __FILE__, __LINE__, "connect_option::keep_alive ignored for connect_remote(...)");
|
||||||
|
}
|
||||||
|
inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, const connect_option::hint& hint) {
|
||||||
|
if (!remote) opts["hint"] = hint.address;
|
||||||
|
else omq.log(LogLevel::warn, __FILE__, __LINE__, "connect_option::hint ignored for connect_remote(...)");
|
||||||
|
}
|
||||||
|
[[deprecated("use oxenmq::connect_option::keep_alive or ::timeout instead")]]
|
||||||
|
inline void apply_connect_option(OxenMQ&, bool remote, bt_dict& opts, std::chrono::milliseconds time) {
|
||||||
|
if (remote) opts["timeout"] = time.count();
|
||||||
|
else opts["keep_alive"] = time.count();
|
||||||
|
}
|
||||||
|
[[deprecated("use oxenmq::connect_option::hint{hint} instead of a direct string argument")]]
|
||||||
|
inline void apply_connect_option(OxenMQ& omq, bool remote, bt_dict& opts, std::string_view hint) {
|
||||||
|
if (!remote) opts["hint"] = hint;
|
||||||
|
else omq.log(LogLevel::warn, __FILE__, __LINE__, "string argument ignored for connect_remote(...)");
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
|
template <typename... Option>
|
||||||
|
ConnectionID OxenMQ::connect_remote(const address& remote, ConnectSuccess on_connect, ConnectFailure on_failure,
|
||||||
|
const Option&... options) {
|
||||||
|
bt_dict opts;
|
||||||
|
(detail::apply_connect_option(*this, true, opts, options), ...);
|
||||||
|
|
||||||
|
auto id = next_conn_id++;
|
||||||
|
opts["conn_id"] = id;
|
||||||
|
opts["connect"] = detail::serialize_object(std::move(on_connect));
|
||||||
|
opts["failure"] = detail::serialize_object(std::move(on_failure));
|
||||||
|
if (remote.curve()) opts["pubkey"] = remote.pubkey;
|
||||||
|
opts["remote"] = remote.zmq_address();
|
||||||
|
|
||||||
|
detail::send_control(get_control_socket(), "CONNECT_REMOTE", bt_serialize(opts));
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename... Option>
|
||||||
|
ConnectionID OxenMQ::connect_sn(std::string_view pubkey, const Option&... options) {
|
||||||
|
bt_dict opts{
|
||||||
|
{"keep_alive", std::chrono::microseconds{DEFAULT_CONNECT_SN_KEEP_ALIVE}.count()},
|
||||||
|
{"ephemeral_rid", EPHEMERAL_ROUTING_ID},
|
||||||
|
};
|
||||||
|
|
||||||
|
(detail::apply_connect_option(*this, false, opts, options), ...);
|
||||||
|
|
||||||
|
opts["pubkey"] = pubkey;
|
||||||
|
|
||||||
|
detail::send_control(get_control_socket(), "CONNECT_SN", bt_serialize(opts));
|
||||||
|
|
||||||
|
return pubkey;
|
||||||
|
}
|
||||||
|
|
||||||
template <typename... T>
|
template <typename... T>
|
||||||
void OxenMQ::send(ConnectionID to, std::string_view cmd, const T&... opts) {
|
void OxenMQ::send(ConnectionID to, std::string_view cmd, const T&... opts) {
|
||||||
|
|
|
@ -116,7 +116,7 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
|
||||||
retry = false;
|
retry = false;
|
||||||
zmq::socket_t *send_to;
|
zmq::socket_t *send_to;
|
||||||
if (conn_id.sn()) {
|
if (conn_id.sn()) {
|
||||||
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, keep_alive);
|
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, EPHEMERAL_ROUTING_ID, keep_alive);
|
||||||
if (!sock_route.first) {
|
if (!sock_route.first) {
|
||||||
nowarn = true;
|
nowarn = true;
|
||||||
if (optional)
|
if (optional)
|
||||||
|
@ -176,7 +176,12 @@ void OxenMQ::proxy_send(bt_dict_consumer data) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!retry) {
|
if (!retry) {
|
||||||
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
|
if (!conn_id.sn() && !conn_id.route.empty()) { // incoming non-SN connection
|
||||||
|
LMQ_LOG(debug, "Unable to send message to incoming connection ", conn_id, ": ", e.what(),
|
||||||
|
"; remote has probably disconnected");
|
||||||
|
} else {
|
||||||
|
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
|
||||||
|
}
|
||||||
nowarn = true;
|
nowarn = true;
|
||||||
if (callback_nosend) {
|
if (callback_nosend) {
|
||||||
job([callback = std::move(callback_nosend), error = e] { callback(&error); });
|
job([callback = std::move(callback_nosend), error = e] { callback(&error); });
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
namespace oxenmq {
|
namespace oxenmq {
|
||||||
constexpr int VERSION_MAJOR = @OXENMQ_VERSION_MAJOR@;
|
constexpr int VERSION_MAJOR = @PROJECT_VERSION_MAJOR@;
|
||||||
constexpr int VERSION_MINOR = @OXENMQ_VERSION_MINOR@;
|
constexpr int VERSION_MINOR = @PROJECT_VERSION_MINOR@;
|
||||||
constexpr int VERSION_PATCH = @OXENMQ_VERSION_PATCH@;
|
constexpr int VERSION_PATCH = @PROJECT_VERSION_PATCH@;
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ TEST_CASE("outgoing auth level", "[commands][auth]") {
|
||||||
client.add_command("admin", "hi", [&](auto&) { admin_hi++; });
|
client.add_command("admin", "hi", [&](auto&) { admin_hi++; });
|
||||||
client.start();
|
client.start();
|
||||||
|
|
||||||
client.PUBKEY_BASED_ROUTING_ID = false; // establishing multiple connections below, so we need unique routing ids
|
client.EPHEMERAL_ROUTING_ID = true; // establishing multiple connections below, so we need unique routing ids
|
||||||
|
|
||||||
address server_addr{listen, server.get_pubkey()};
|
address server_addr{listen, server.get_pubkey()};
|
||||||
auto public_c = client.connect_remote(server_addr, [](auto&&...) {}, [](auto&&...) {});
|
auto public_c = client.connect_remote(server_addr, [](auto&&...) {}, [](auto&&...) {});
|
||||||
|
@ -415,13 +415,19 @@ TEST_CASE("data parts", "[send][data_parts]") {
|
||||||
r.clear();
|
r.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<std::string_view> opt1, opt2;
|
||||||
|
std::optional<std::string> opt3, opt4;
|
||||||
|
opt1 = "o1"sv;
|
||||||
|
opt4 = "o4"s;
|
||||||
std::vector some_data2{{"a"sv, "b"sv, "\0"sv}};
|
std::vector some_data2{{"a"sv, "b"sv, "\0"sv}};
|
||||||
client.send(c, "public.hello",
|
client.send(c, "public.hello",
|
||||||
"hi",
|
"hi",
|
||||||
oxenmq::send_option::data_parts(some_data2.begin(), some_data2.end()),
|
oxenmq::send_option::data_parts(some_data2.begin(), some_data2.end()),
|
||||||
"another",
|
"another",
|
||||||
"string"sv,
|
"string"sv,
|
||||||
oxenmq::send_option::data_parts(some_data.begin(), some_data.end()));
|
oxenmq::send_option::data_parts(some_data.begin(), some_data.end()),
|
||||||
|
opt1, opt2, opt3, opt4
|
||||||
|
);
|
||||||
|
|
||||||
std::vector<std::string> expected;
|
std::vector<std::string> expected;
|
||||||
expected.push_back("hi");
|
expected.push_back("hi");
|
||||||
|
@ -429,6 +435,8 @@ TEST_CASE("data parts", "[send][data_parts]") {
|
||||||
expected.push_back("another");
|
expected.push_back("another");
|
||||||
expected.push_back("string");
|
expected.push_back("string");
|
||||||
expected.insert(expected.end(), some_data.begin(), some_data.end());
|
expected.insert(expected.end(), some_data.begin(), some_data.end());
|
||||||
|
expected.push_back("o1");
|
||||||
|
expected.push_back("o4");
|
||||||
|
|
||||||
reply_sleep();
|
reply_sleep();
|
||||||
{
|
{
|
||||||
|
|
|
@ -107,7 +107,7 @@ TEST_CASE("plain-text connections", "[plaintext][connect]") {
|
||||||
|
|
||||||
std::atomic<bool> got{false};
|
std::atomic<bool> got{false};
|
||||||
bool success = false;
|
bool success = false;
|
||||||
auto c = client.connect_remote(listen,
|
auto c = client.connect_remote(address{listen},
|
||||||
[&](auto conn) { success = true; got = true; },
|
[&](auto conn) { success = true; got = true; },
|
||||||
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; }
|
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); got = true; }
|
||||||
);
|
);
|
||||||
|
@ -149,11 +149,11 @@ TEST_CASE("unique connection IDs", "[connect][id]") {
|
||||||
client2.start();
|
client2.start();
|
||||||
|
|
||||||
std::atomic<bool> good1{false}, good2{false};
|
std::atomic<bool> good1{false}, good2{false};
|
||||||
auto r1 = client1.connect_remote(listen,
|
auto r1 = client1.connect_remote(address{listen},
|
||||||
[&](auto conn) { good1 = true; },
|
[&](auto conn) { good1 = true; },
|
||||||
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
|
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
|
||||||
);
|
);
|
||||||
auto r2 = client2.connect_remote(listen,
|
auto r2 = client2.connect_remote(address{listen},
|
||||||
[&](auto conn) { good2 = true; },
|
[&](auto conn) { good2 = true; },
|
||||||
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
|
[&](auto conn, std::string_view reason) { auto lock = catch_lock(); INFO("connection failed: " << reason); }
|
||||||
);
|
);
|
||||||
|
@ -370,7 +370,7 @@ TEST_CASE("SN single worker test", "[connect][worker]") {
|
||||||
|
|
||||||
OxenMQ client{get_logger("B» "), LogLevel::trace};
|
OxenMQ client{get_logger("B» "), LogLevel::trace};
|
||||||
client.start();
|
client.start();
|
||||||
auto conn = client.connect_remote(listen, [](auto) {}, [](auto, auto) {});
|
auto conn = client.connect_remote(address{listen}, [](auto) {}, [](auto, auto) {});
|
||||||
|
|
||||||
std::atomic<int> got{0};
|
std::atomic<int> got{0};
|
||||||
std::atomic<int> success{0};
|
std::atomic<int> success{0};
|
||||||
|
|
Loading…
Reference in a new issue