Add `send_option::outgoing` to force a send on an outgoing connection

SS wants this, in particular, to be able to do reachability tests.
(Using connect_remote for this was bad with pubkey-based routing ids
because the second connection could replace an existing connection).
This commit is contained in:
Jason Rhinelander 2020-04-03 01:31:45 -03:00
parent b9e9f10f29
commit e3a86aaf71
3 changed files with 30 additions and 5 deletions

View File

@ -54,13 +54,15 @@ void LokiMQ::setup_outgoing_socket(zmq::socket_t& socket, string_view remote_pub
}
std::pair<zmq::socket_t *, std::string>
LokiMQ::proxy_connect_sn(string_view remote, string_view connect_hint, bool optional, bool incoming_only, std::chrono::milliseconds keep_alive) {
LokiMQ::proxy_connect_sn(string_view remote, string_view connect_hint, bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive) {
ConnectionID remote_cid{remote};
auto its = peers.equal_range(remote_cid);
peer_info* peer = nullptr;
for (auto it = its.first; it != its.second; ++it) {
if (incoming_only && it->second.route.empty())
continue; // outgoing connection but we were asked to only use incoming connections
if (outgoing_only && !it->second.route.empty())
continue;
peer = &it->second;
break;
}
@ -129,7 +131,7 @@ LokiMQ::proxy_connect_sn(string_view remote, string_view connect_hint, bool opti
std::pair<zmq::socket_t *, std::string> LokiMQ::proxy_connect_sn(bt_dict_consumer data) {
string_view hint, remote_pk;
std::chrono::milliseconds keep_alive;
bool optional = false, incoming_only = false;
bool optional = false, incoming_only = false, outgoing_only = false;
// Alphabetical order
if (data.skip_until("hint"))
@ -140,11 +142,13 @@ std::pair<zmq::socket_t *, std::string> LokiMQ::proxy_connect_sn(bt_dict_consume
keep_alive = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (data.skip_until("optional"))
optional = data.consume_integer<bool>();
if (data.skip_until("outgoing_only"))
outgoing_only = data.consume_integer<bool>();
if (!data.skip_until("pubkey"))
throw std::runtime_error("Internal error: Invalid proxy_connect_sn command; pubkey missing");
remote_pk = data.consume_string();
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, keep_alive);
return proxy_connect_sn(remote_pk, hint, optional, incoming_only, outgoing_only, keep_alive);
}
template <typename Container, typename AccessIndex>

View File

@ -434,7 +434,7 @@ private:
/// connection. If we already have an outgoing connection then its keep-alive gets increased to
/// this if currently less than this.
std::pair<zmq::socket_t*, std::string> proxy_connect_sn(string_view pubkey, string_view connect_hint,
bool optional, bool incoming_only, std::chrono::milliseconds keep_alive);
bool optional, bool incoming_only, bool outgoing_only, std::chrono::milliseconds keep_alive);
/// CONNECT_SN command telling us to connect to a new pubkey. Returns the socket (which could
/// be existing or a new one). This basically just unpacks arguments and passes them on to
@ -1050,6 +1050,19 @@ struct incoming {
explicit incoming(bool inc = true) : is_incoming{inc} {}
};
/// Specifies that the message must use an outgoing connection; for messages to a service node the
/// message will be delivered over an existing outgoing connection, if one is established, and a new
/// outgoing connection opened to deliver the message if none is currently established. For non-SN
/// messages, the message will simply be dropped if it is attempting to be sent on an incoming
/// socket, and send otherwise on an outgoing socket (this option is primarily aimed at SN
/// messages).
struct outgoing {
bool is_outgoing = true;
// Constructor; default construction gives you an outgoing-only, but the bool parameter can be
// specified as false to explicitly disable the outgoing-only flag.
explicit outgoing(bool out = true) : is_outgoing{out} {}
};
/// Specifies the idle timeout for the connection - if a new or existing outgoing connection is used
/// for the send and its current idle timeout setting is less than this value then it is updated.
struct keep_alive {
@ -1152,6 +1165,11 @@ inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option
control_data["incoming"] = i.is_incoming;
}
/// `outgoing` specialization: sets the outgoing-only flag in the control data
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::outgoing& o) {
control_data["outgoing"] = o.is_outgoing;
}
/// `keep_alive` specialization: increases the outgoing socket idle timeout (if shorter)
inline void apply_send_option(bt_list&, bt_dict& control_data, const send_option::keep_alive& timeout) {
control_data["keep_alive"] = timeout.time.count();

View File

@ -33,6 +33,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE};
std::chrono::milliseconds request_timeout{DEFAULT_REQUEST_TIMEOUT};
bool optional = false;
bool outgoing = false;
bool incoming = false;
bool request = false;
bool have_conn_id = false;
@ -63,6 +64,8 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
keep_alive = std::chrono::milliseconds{data.consume_integer<uint64_t>()};
if (data.skip_until("optional"))
optional = data.consume_integer<bool>();
if (data.skip_until("outgoing"))
outgoing = data.consume_integer<bool>();
if (data.skip_until("request"))
request = data.consume_integer<bool>();
@ -100,7 +103,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
retry = false;
zmq::socket_t *send_to;
if (conn_id.sn()) {
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, keep_alive);
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, outgoing, keep_alive);
if (!sock_route.first) {
if (optional)
LMQ_LOG(debug, "Not sending: send is optional and no connection to ",