From bcd64088313490909cd4e26a0378737eecb22410 Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Fri, 5 Jul 2019 16:49:26 +1000 Subject: [PATCH] Don't foget to unsubscribe connections from message notifications (#214) --- httpserver/http_connection.cpp | 129 +++++++++++++++++++++------------ httpserver/http_connection.h | 14 ++-- httpserver/https_client.cpp | 10 ++- httpserver/service_node.cpp | 44 ++++++++++- httpserver/service_node.h | 3 + 5 files changed, 143 insertions(+), 57 deletions(-) diff --git a/httpserver/http_connection.cpp b/httpserver/http_connection.cpp index c513eaf..64b86f8 100644 --- a/httpserver/http_connection.cpp +++ b/httpserver/http_connection.cpp @@ -48,10 +48,6 @@ constexpr auto TEST_RETRY_PERIOD = std::chrono::milliseconds(50); // (rounded up) constexpr size_t MAX_MESSAGE_BODY = 3100; -static void log_error(const error_code& ec) { - LOKI_LOG(error, "Error{}: {}", ec.value(), ec.message()); -} - void make_http_request(boost::asio::io_context& ioc, const std::string& sn_address, uint16_t port, const std::shared_ptr& req, @@ -152,8 +148,10 @@ accept_connection(boost::asio::io_context& ioc, security) ->start(); - if (ec) - log_error(ec); + if (ec) { + LOKI_LOG(error, "Could not accept a new connection {}: {}", + ec.value(), ec.message()); + } accept_connection(ioc, ssl_ctx, acceptor, sn, channel_encryption, rate_limiter, security); @@ -193,17 +191,23 @@ connection_t::connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx, : ioc_(ioc), ssl_ctx_(ssl_ctx), socket_(std::move(socket)), stream_(socket_, ssl_ctx_), service_node_(sn), channel_cipher_(channel_encryption), rate_limiter_(rate_limiter), - repeat_timer_(ioc), deadline_(ioc, SESSION_TIME_LIMIT), - notification_ctx_({boost::asio::steady_timer{ioc}, boost::none}), + repeat_timer_(ioc), + deadline_(ioc, SESSION_TIME_LIMIT), notification_ctx_{boost::none}, security_(security) { - LOKI_LOG(trace, "connection_t"); + static uint64_t instance_counter = 0; + conn_idx = instance_counter++; + + LOKI_LOG(debug, "connection_t [{}]", conn_idx); start_timestamp_ = std::chrono::steady_clock::now(); } connection_t::~connection_t() { + // TODO: should check if we are still registered for + // notifications, and deregister if so. + // Safety net if (stream_.lowest_layer().is_open()) { LOKI_LOG(warn, "Client socket should be closed by this point, but " @@ -211,7 +215,7 @@ connection_t::~connection_t() { stream_.lowest_layer().close(); } - LOKI_LOG(trace, "~connection_t"); + LOKI_LOG(debug, "~connection_t [{}]", conn_idx); } void connection_t::start() { @@ -236,17 +240,21 @@ void connection_t::on_handshake(boost::system::error_code ec) { read_request(); } -void connection_t::notify(const message_t& msg) { - LOKI_LOG(debug, "Processing message notification: {}", msg.data); - // save messages, so we can access them once the timer event happens - notification_ctx_.message = msg; - // the timer callback will be called once we complete the current callback - notification_ctx_.timer.cancel(); -} +void connection_t::notify(boost::optional msg) { -void connection_t::reset() { - LOKI_LOG(debug, "Resetting the connection"); - notification_ctx_.timer.cancel(); + if (!notification_ctx_) { + LOKI_LOG(error, + "Trying to notify a connection without notification context"); + return; + } + + if (msg) { + LOKI_LOG(trace, "Processing message notification: {}", msg->data); + // save messages, so we can access them once the timer event happens + notification_ctx_->message = msg; + } + // the timer callback will be called once we complete the current callback + notification_ctx_->timer.cancel(); } // Asynchronously receive a complete request message. @@ -258,7 +266,10 @@ void connection_t::read_request() { LOKI_LOG(trace, "on data: {} bytes", bytes_transferred); if (ec) { - log_error(ec); + LOKI_LOG( + error, + "Failed to read from a socket [{}: {}], connection idx: {}", + ec.value(), ec.message(), self->conn_idx); return; } @@ -266,7 +277,8 @@ void connection_t::read_request() { try { self->process_request(); } catch (const std::exception& e) { - LOKI_LOG(error, "Exception caught: {}", e.what()); + LOKI_LOG(error, "Exception caught processing a request: {}", + e.what()); self->body_stream_ << e.what(); } @@ -348,7 +360,9 @@ void connection_t::process_storage_test_req(uint64_t height, tester_addr](const error_code& ec) { if (ec) { if (ec != boost::asio::error::operation_aborted) { - log_error(ec); + LOKI_LOG(error, + "Repeat timer failed for storage test [{}: {}]", + ec.value(), ec.message()); } } else { self->process_storage_test_req(height, tester_addr, msg_hash); @@ -862,27 +876,32 @@ void connection_t::poll_db(const std::string& pk, // until new data arrives for this PubKey service_node_.register_listener(pk, self); - notification_ctx_.timer.expires_after(LONG_POLL_TIMEOUT); - notification_ctx_.timer.async_wait([=](const error_code& ec) { - // we use timer cancellation as notification mechanism - if (ec == boost::asio::error::operation_aborted) { + notification_ctx_ = notification_context_t{ + boost::asio::steady_timer{ioc_}, boost::none, pk}; + notification_ctx_->timer.expires_after(LONG_POLL_TIMEOUT); + notification_ctx_->timer.async_wait([=](const error_code& ec) { + if (ec == boost::asio::error::operation_aborted) { + LOKI_LOG(trace, "Notification timer manually triggered"); + // we use timer cancellation as notification mechanism std::vector items; - auto msg = notification_ctx_.message; + auto msg = notification_ctx_->message; if (msg) { items.push_back(*msg); } respond_with_messages(items); } else { + LOKI_LOG(trace, "Notification timer expired"); // If we are here, the notification timer expired // with no messages ready respond_with_messages({}); } + + service_node_.remove_listener(pk, self.get()); }); } else { - respond_with_messages(items); } } @@ -992,17 +1011,21 @@ void connection_t::register_deadline() { auto self = shared_from_this(); - deadline_.async_wait([self](error_code ec) { + deadline_.async_wait([self = std::move(self)](error_code ec) { bool cancelled = (ec && ec == boost::asio::error::operation_aborted); if (ec && !cancelled) { LOKI_LOG(error, "Deadline timer error [{}]: {}", ec.value(), ec.message()); - } else { - LOKI_LOG(error, "[connection_t] socket timed out"); } - // Close socket to cancel any outstanding operation. + if (!cancelled) { + + if (self->notification_ctx_) { + self->service_node_.remove_listener( + self->notification_ctx_->pubkey, self.get()); + } + LOKI_LOG(debug, "Closing [connection_t] socket due to timeout"); self->do_close(); } }); @@ -1074,10 +1097,10 @@ void HttpClientSession::on_write(error_code ec, size_t bytes_transferred) { void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) { - LOKI_LOG(trace, "Successfully received {} bytes.", bytes_transferred); - if (!ec || (ec == http::error::end_of_stream)) { + LOKI_LOG(trace, "Successfully received {} bytes.", bytes_transferred); + if (http::to_status_class(res_.result_int()) == http::status_class::successful) { std::shared_ptr body = @@ -1090,10 +1113,11 @@ void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) { } } else { - /// Do we need to handle `operation aborted` separately here (due to - /// deadline timer)? - LOKI_LOG(error, "Error on read: {}. Message: {}", ec.value(), - ec.message()); + + if (ec != boost::asio::error::operation_aborted) { + LOKI_LOG(error, "Error on read: {}. Message: {}", ec.value(), + ec.message()); + } trigger_callback(SNodeError::ERROR_OTHER, nullptr); } } @@ -1120,7 +1144,10 @@ void HttpClientSession::start() { [self = shared_from_this()](const error_code& ec) { if (ec) { if (ec != boost::asio::error::operation_aborted) { - log_error(ec); + LOKI_LOG( + error, + "Deadline timer failed in http client session [{}: {}]", + ec.value(), ec.message()); } } else { LOKI_LOG(error, "client socket timed out"); @@ -1147,15 +1174,27 @@ HttpClientSession::~HttpClientSession() { sn_response_t{SNodeError::ERROR_OTHER, nullptr})); } - error_code ec; - // Gracefully close the socket - socket_.shutdown(tcp::socket::shutdown_both, ec); + if (!socket_.is_open()) { + LOKI_LOG(debug, "Socket is already closed"); + return; + } + error_code ec; + + /// From boost documentation: "For portable behaviour with respect to + /// graceful closure of a connected socket, call shutdown() before closing + /// the socket." + socket_.shutdown(tcp::socket::shutdown_both, ec); // not_connected happens sometimes so don't bother reporting it. if (ec && ec != boost::system::errc::not_connected) { + LOKI_LOG(error, "Socket shutdown failure [{}: {}]", ec.value(), + ec.message()); + } - LOKI_LOG(error, "ec: {}. Message: {}", ec.value(), ec.message()); - return; + socket_.close(ec); + + if (ec) { + LOKI_LOG(error, "On close socket [{}: {}]", ec.value(), ec.message()); } } diff --git a/httpserver/http_connection.h b/httpserver/http_connection.h index ba2481a..3c27864 100644 --- a/httpserver/http_connection.h +++ b/httpserver/http_connection.h @@ -184,7 +184,11 @@ class connection_t : public std::enable_shared_from_this { // the message is stored here momentarily; needed because // we can't pass it using current notification mechanism boost::optional message; - } notification_ctx_; + // Messenger public key that this connection is registered for + std::string pubkey; + }; + + boost::optional notification_ctx_; public: connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx, @@ -194,13 +198,13 @@ class connection_t : public std::enable_shared_from_this { ~connection_t(); + // Connection index, mainly used for debugging + uint64_t conn_idx; + /// Initiate the asynchronous operations associated with the connection. void start(); - void notify(const message_t& msg); - - /// "Reset" the connection by sending an empty message list - void reset(); + void notify(boost::optional msg); private: void do_handshake(); diff --git a/httpserver/https_client.cpp b/httpserver/https_client.cpp index 4c79d43..b4ebbeb 100644 --- a/httpserver/https_client.cpp +++ b/httpserver/https_client.cpp @@ -22,8 +22,8 @@ void make_https_request(boost::asio::io_context& ioc, #else if (sn_address == "0.0.0.0") { - LOKI_LOG(error, "Could not initiate request to snode (we don't know " - "their IP yet)."); + LOKI_LOG(warn, "Could not initiate request to snode (we don't know " + "their IP yet)."); return; } @@ -201,8 +201,10 @@ void HttpsClientSession::start() { if (ec) { std::ostringstream os; os << endpoint; - LOKI_LOG(error, "[https client]: could not connect to {}, message: {} ({})", - os.str(), ec.message(), ec.value()); + LOKI_LOG( + error, + "[https client]: could not connect to {}, message: {} ({})", + os.str(), ec.message(), ec.value()); trigger_callback(SNodeError::NO_REACH, nullptr); return; } diff --git a/httpserver/service_node.cpp b/httpserver/service_node.cpp index 63acd51..06f20f2 100644 --- a/httpserver/service_node.cpp +++ b/httpserver/service_node.cpp @@ -368,9 +368,46 @@ void ServiceNode::send_sn_request(const std::shared_ptr& req, void ServiceNode::register_listener(const std::string& pk, const std::shared_ptr& c) { + + // NOTE: it is the responsibility of connection_t to deregister itself! pk_to_listeners[pk].push_back(c); - LOKI_LOG(debug, "register pubkey: {}, total pubkeys: {}", pk, + LOKI_LOG(debug, "Register pubkey: {}, total pubkeys: {}", pk, pk_to_listeners.size()); + + LOKI_LOG(debug, "Number of connections listening for {}: {}", pk, + pk_to_listeners[pk].size()); +} + +void ServiceNode::remove_listener(const std::string& pk, + const connection_t* const c) { + + const auto it = pk_to_listeners.find(pk); + if (it == pk_to_listeners.end()) { + /// This will sometimes happen because we reset all listeners on + /// push_all + LOKI_LOG(debug, "Trying to remove an unknown pk from the notification " + "map. Operation ignored."); + } else { + LOKI_LOG(trace, + "Deregistering notification for connection {} for pk {}", + c->conn_idx, pk); + auto& cs = it->second; + const auto new_end = std::remove_if( + cs.begin(), cs.end(), [c](const std::shared_ptr& e) { + return e.get() == c; + }); + const auto count = std::distance(new_end, cs.end()); + cs.erase(new_end, cs.end()); + + if (count == 0) { + LOKI_LOG(debug, "Connection {} in not registered for pk {}", + c->conn_idx, pk); + } else if (count > 1) { + LOKI_LOG(debug, + "Multiple registrations ({}) for connection {} for pk {}", + count, c->conn_idx, pk); + } + } } void ServiceNode::notify_listeners(const std::string& pk, @@ -400,7 +437,8 @@ void ServiceNode::reset_listeners() { /// simplicity for (auto& entry : pk_to_listeners) { for (auto& c : entry.second) { - c->reset(); + /// notify with no messages + c->notify(boost::none); } } @@ -868,7 +906,7 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester, members.push_back(our_address_); if (members.size() < 2) { - LOKI_LOG(error, "Could not initiate peer test: swarm too small"); + LOKI_LOG(warn, "Could not initiate peer test: swarm too small"); return false; } diff --git a/httpserver/service_node.h b/httpserver/service_node.h index b304a7c..0f218a5 100644 --- a/httpserver/service_node.h +++ b/httpserver/service_node.h @@ -218,6 +218,9 @@ class ServiceNode { void register_listener(const std::string& pk, const connection_ptr& connection); + void remove_listener(const std::string& pk, + const http_server::connection_t* const connection); + // Notify listeners of a new message for pk void notify_listeners(const std::string& pk, const message_t& msg);