Avoid segfault on retried SN connection request

When we fail to send to a SN but can retry (e.g. because we had an
incoming connection which no longer works, but can retry an outgoing
connection) we were recursing, but this was resulting in a double-free
of the request callback (since we'd try to take ownership of the
incoming serialized pointer twice).

Rewrite the code to use a loop with single ownership instead.

This also changes the request callback behaviour to fire a failure
callback immediately if we can't send a request; previously you'd have
to wait for a timeout, but that is pointless if we couldn't get the
request out.
This commit is contained in:
Jason Rhinelander 2020-03-27 14:49:02 -03:00
parent a7c669775f
commit 0639bfa629
1 changed files with 86 additions and 64 deletions

View File

@ -28,8 +28,6 @@ void LokiMQ::proxy_quit() {
}
void LokiMQ::proxy_send(bt_dict_consumer data) {
bt_dict_consumer orig_data = data;
// NB: bt_dict_consumer goes in alphabetical order
string_view hint;
std::chrono::milliseconds keep_alive{DEFAULT_SEND_KEEP_ALIVE};
@ -41,7 +39,7 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
ConnectionID conn_id;
std::string request_tag;
std::unique_ptr<ReplyCallback> request_cbptr;
ReplyCallback request_callback;
if (data.skip_until("conn_id")) {
conn_id.id = data.consume_integer<long long>();
if (conn_id.id == -1)
@ -71,7 +69,15 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
if (request) {
if (!data.skip_until("request_callback"))
throw std::runtime_error("Internal error: received request without request_callback");
request_cbptr.reset(reinterpret_cast<ReplyCallback*>(data.consume_integer<uintptr_t>()));
// The initiator gives up ownership of the callback to us (serializing it through a
// uintptr_t), so we take the pointer, move the value out of it, then destroy the pointer we
// were given. Further down, if we are able to send the request successfully, we set up the
// pending request.
auto* cbptr = reinterpret_cast<ReplyCallback*>(data.consume_integer<uintptr_t>());
request_callback = std::move(*cbptr);
delete cbptr;
if (!data.skip_until("request_tag"))
throw std::runtime_error("Internal error: received request without request_name");
request_tag = data.consume_string();
@ -82,72 +88,88 @@ void LokiMQ::proxy_send(bt_dict_consumer data) {
throw std::runtime_error("Internal error: Invalid proxy send command; send parts missing");
bt_list_consumer send = data.consume_list_consumer();
zmq::socket_t *send_to;
if (conn_id.sn()) {
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, keep_alive);
if (!sock_route.first) {
if (optional)
LMQ_LOG(debug, "Not sending: send is optional and no connection to ",
to_hex(conn_id.pk), " is currently established");
else
LMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no connection address found");
return;
}
send_to = sock_route.first;
conn_id.route = std::move(sock_route.second);
} else if (!conn_id.route.empty()) { // incoming non-SN connection
auto it = incoming_conn_index.find(conn_id);
if (it == incoming_conn_index.end()) {
LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found");
return;
}
send_to = &connections[it->second];
} else {
auto pr = peers.equal_range(conn_id);
if (pr.first == peers.end()) {
LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
return;
}
auto& peer = pr.first->second;
send_to = &connections[peer.conn_index];
}
if (request) {
LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag));
pending_requests.insert({ request_tag, {
std::chrono::steady_clock::now() + request_timeout, std::move(*request_cbptr) }});
}
try {
send_message_parts(*send_to, build_send_parts(send, conn_id.route));
} catch (const zmq::error_t &e) {
if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) {
LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details");
// Our incoming connection no longer exists; remove it from `peers`.
// Now figure out which socket to send to and do the actual sending. We can repeat this loop
// multiple times, if we're sending to a SN, because it's possible that we have multiple
// connections open to that SN (e.g. one out + one in) so if one fails we can clean up that
// connection and try the next one.
bool retry = true, sent = false;
while (retry) {
retry = false;
zmq::socket_t *send_to;
if (conn_id.sn()) {
auto sock_route = proxy_connect_sn(conn_id.pk, hint, optional, incoming, keep_alive);
if (!sock_route.first) {
if (optional)
LMQ_LOG(debug, "Not sending: send is optional and no connection to ",
to_hex(conn_id.pk), " is currently established");
else
LMQ_LOG(error, "Unable to send to ", to_hex(conn_id.pk), ": no connection address found");
break;
}
send_to = sock_route.first;
conn_id.route = std::move(sock_route.second);
} else if (!conn_id.route.empty()) { // incoming non-SN connection
auto it = incoming_conn_index.find(conn_id);
if (it == incoming_conn_index.end()) {
LMQ_LOG(warn, "Unable to send to ", conn_id, ": incoming listening socket not found");
break;
}
send_to = &connections[it->second];
} else {
auto pr = peers.equal_range(conn_id);
if (pr.first != peers.end()) {
if (!conn_id.sn()) {
peers.erase(pr.first);
} else {
bool removed;
for (auto it = pr.first; it != pr.second; ) {
auto& peer = it->second;
if (peer.route == conn_id.route) {
peers.erase(it);
removed = true;
break;
if (pr.first == peers.end()) {
LMQ_LOG(warn, "Unable to send: connection id ", conn_id, " is not (or is no longer) a valid outgoing connection");
break;
}
auto& peer = pr.first->second;
send_to = &connections[peer.conn_index];
}
try {
send_message_parts(*send_to, build_send_parts(send, conn_id.route));
sent = true;
} catch (const zmq::error_t &e) {
if (e.num() == EHOSTUNREACH && !conn_id.route.empty() /*= incoming conn*/) {
LMQ_LOG(debug, "Incoming connection is no longer valid; removing peer details");
auto pr = peers.equal_range(conn_id);
if (pr.first != peers.end()) {
if (!conn_id.sn()) {
peers.erase(pr.first);
} else {
bool removed;
for (auto it = pr.first; it != pr.second; ) {
auto& peer = it->second;
if (peer.route == conn_id.route) {
peers.erase(it);
removed = true;
break;
}
}
// The incoming connection to the SN is no longer good, but we can retry because
// we may have another active connection with the SN (or may want to open one).
if (removed) {
LMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets");
retry = true;
}
}
// The incoming connection to the SN is no longer good, but we can retry because
// we may have another active connection with the SN (or may want to open one).
if (removed) {
LMQ_LOG(debug, "Retrying sending to SN ", to_hex(conn_id.pk), " using other sockets");
return proxy_send(orig_data);
}
}
}
if (!retry) {
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
}
}
}
if (request) {
if (sent) {
LMQ_LOG(debug, "Added new pending request ", to_hex(request_tag));
pending_requests.insert({ request_tag, {
std::chrono::steady_clock::now() + request_timeout, std::move(request_callback) }});
} else {
LMQ_LOG(debug, "Could not send request, scheduling request callback failure");
job([callback = std::move(request_callback)] { callback(false, {}); });
}
LMQ_LOG(warn, "Unable to send message to ", conn_id, ": ", e.what());
}
}