Don't foget to unsubscribe connections from message notifications (#214)

This commit is contained in:
Maxim Shishmarev 2019-07-05 16:49:26 +10:00 committed by GitHub
parent d1d003e688
commit bcd6408831
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 143 additions and 57 deletions

View file

@ -48,10 +48,6 @@ constexpr auto TEST_RETRY_PERIOD = std::chrono::milliseconds(50);
// (rounded up)
constexpr size_t MAX_MESSAGE_BODY = 3100;
static void log_error(const error_code& ec) {
LOKI_LOG(error, "Error{}: {}", ec.value(), ec.message());
}
void make_http_request(boost::asio::io_context& ioc,
const std::string& sn_address, uint16_t port,
const std::shared_ptr<request_t>& req,
@ -152,8 +148,10 @@ accept_connection(boost::asio::io_context& ioc,
security)
->start();
if (ec)
log_error(ec);
if (ec) {
LOKI_LOG(error, "Could not accept a new connection {}: {}",
ec.value(), ec.message());
}
accept_connection(ioc, ssl_ctx, acceptor, sn, channel_encryption,
rate_limiter, security);
@ -193,17 +191,23 @@ connection_t::connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
: ioc_(ioc), ssl_ctx_(ssl_ctx), socket_(std::move(socket)),
stream_(socket_, ssl_ctx_), service_node_(sn),
channel_cipher_(channel_encryption), rate_limiter_(rate_limiter),
repeat_timer_(ioc), deadline_(ioc, SESSION_TIME_LIMIT),
notification_ctx_({boost::asio::steady_timer{ioc}, boost::none}),
repeat_timer_(ioc),
deadline_(ioc, SESSION_TIME_LIMIT), notification_ctx_{boost::none},
security_(security) {
LOKI_LOG(trace, "connection_t");
static uint64_t instance_counter = 0;
conn_idx = instance_counter++;
LOKI_LOG(debug, "connection_t [{}]", conn_idx);
start_timestamp_ = std::chrono::steady_clock::now();
}
connection_t::~connection_t() {
// TODO: should check if we are still registered for
// notifications, and deregister if so.
// Safety net
if (stream_.lowest_layer().is_open()) {
LOKI_LOG(warn, "Client socket should be closed by this point, but "
@ -211,7 +215,7 @@ connection_t::~connection_t() {
stream_.lowest_layer().close();
}
LOKI_LOG(trace, "~connection_t");
LOKI_LOG(debug, "~connection_t [{}]", conn_idx);
}
void connection_t::start() {
@ -236,17 +240,21 @@ void connection_t::on_handshake(boost::system::error_code ec) {
read_request();
}
void connection_t::notify(const message_t& msg) {
LOKI_LOG(debug, "Processing message notification: {}", msg.data);
// save messages, so we can access them once the timer event happens
notification_ctx_.message = msg;
// the timer callback will be called once we complete the current callback
notification_ctx_.timer.cancel();
}
void connection_t::notify(boost::optional<const message_t&> msg) {
void connection_t::reset() {
LOKI_LOG(debug, "Resetting the connection");
notification_ctx_.timer.cancel();
if (!notification_ctx_) {
LOKI_LOG(error,
"Trying to notify a connection without notification context");
return;
}
if (msg) {
LOKI_LOG(trace, "Processing message notification: {}", msg->data);
// save messages, so we can access them once the timer event happens
notification_ctx_->message = msg;
}
// the timer callback will be called once we complete the current callback
notification_ctx_->timer.cancel();
}
// Asynchronously receive a complete request message.
@ -258,7 +266,10 @@ void connection_t::read_request() {
LOKI_LOG(trace, "on data: {} bytes", bytes_transferred);
if (ec) {
log_error(ec);
LOKI_LOG(
error,
"Failed to read from a socket [{}: {}], connection idx: {}",
ec.value(), ec.message(), self->conn_idx);
return;
}
@ -266,7 +277,8 @@ void connection_t::read_request() {
try {
self->process_request();
} catch (const std::exception& e) {
LOKI_LOG(error, "Exception caught: {}", e.what());
LOKI_LOG(error, "Exception caught processing a request: {}",
e.what());
self->body_stream_ << e.what();
}
@ -348,7 +360,9 @@ void connection_t::process_storage_test_req(uint64_t height,
tester_addr](const error_code& ec) {
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
log_error(ec);
LOKI_LOG(error,
"Repeat timer failed for storage test [{}: {}]",
ec.value(), ec.message());
}
} else {
self->process_storage_test_req(height, tester_addr, msg_hash);
@ -862,27 +876,32 @@ void connection_t::poll_db(const std::string& pk,
// until new data arrives for this PubKey
service_node_.register_listener(pk, self);
notification_ctx_.timer.expires_after(LONG_POLL_TIMEOUT);
notification_ctx_.timer.async_wait([=](const error_code& ec) {
// we use timer cancellation as notification mechanism
if (ec == boost::asio::error::operation_aborted) {
notification_ctx_ = notification_context_t{
boost::asio::steady_timer{ioc_}, boost::none, pk};
notification_ctx_->timer.expires_after(LONG_POLL_TIMEOUT);
notification_ctx_->timer.async_wait([=](const error_code& ec) {
if (ec == boost::asio::error::operation_aborted) {
LOKI_LOG(trace, "Notification timer manually triggered");
// we use timer cancellation as notification mechanism
std::vector<message_t> items;
auto msg = notification_ctx_.message;
auto msg = notification_ctx_->message;
if (msg) {
items.push_back(*msg);
}
respond_with_messages(items);
} else {
LOKI_LOG(trace, "Notification timer expired");
// If we are here, the notification timer expired
// with no messages ready
respond_with_messages<Item>({});
}
service_node_.remove_listener(pk, self.get());
});
} else {
respond_with_messages(items);
}
}
@ -992,17 +1011,21 @@ void connection_t::register_deadline() {
auto self = shared_from_this();
deadline_.async_wait([self](error_code ec) {
deadline_.async_wait([self = std::move(self)](error_code ec) {
bool cancelled = (ec && ec == boost::asio::error::operation_aborted);
if (ec && !cancelled) {
LOKI_LOG(error, "Deadline timer error [{}]: {}", ec.value(),
ec.message());
} else {
LOKI_LOG(error, "[connection_t] socket timed out");
}
// Close socket to cancel any outstanding operation.
if (!cancelled) {
if (self->notification_ctx_) {
self->service_node_.remove_listener(
self->notification_ctx_->pubkey, self.get());
}
LOKI_LOG(debug, "Closing [connection_t] socket due to timeout");
self->do_close();
}
});
@ -1074,10 +1097,10 @@ void HttpClientSession::on_write(error_code ec, size_t bytes_transferred) {
void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) {
LOKI_LOG(trace, "Successfully received {} bytes.", bytes_transferred);
if (!ec || (ec == http::error::end_of_stream)) {
LOKI_LOG(trace, "Successfully received {} bytes.", bytes_transferred);
if (http::to_status_class(res_.result_int()) ==
http::status_class::successful) {
std::shared_ptr<std::string> body =
@ -1090,10 +1113,11 @@ void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) {
}
} else {
/// Do we need to handle `operation aborted` separately here (due to
/// deadline timer)?
LOKI_LOG(error, "Error on read: {}. Message: {}", ec.value(),
ec.message());
if (ec != boost::asio::error::operation_aborted) {
LOKI_LOG(error, "Error on read: {}. Message: {}", ec.value(),
ec.message());
}
trigger_callback(SNodeError::ERROR_OTHER, nullptr);
}
}
@ -1120,7 +1144,10 @@ void HttpClientSession::start() {
[self = shared_from_this()](const error_code& ec) {
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
log_error(ec);
LOKI_LOG(
error,
"Deadline timer failed in http client session [{}: {}]",
ec.value(), ec.message());
}
} else {
LOKI_LOG(error, "client socket timed out");
@ -1147,15 +1174,27 @@ HttpClientSession::~HttpClientSession() {
sn_response_t{SNodeError::ERROR_OTHER, nullptr}));
}
error_code ec;
// Gracefully close the socket
socket_.shutdown(tcp::socket::shutdown_both, ec);
if (!socket_.is_open()) {
LOKI_LOG(debug, "Socket is already closed");
return;
}
error_code ec;
/// From boost documentation: "For portable behaviour with respect to
/// graceful closure of a connected socket, call shutdown() before closing
/// the socket."
socket_.shutdown(tcp::socket::shutdown_both, ec);
// not_connected happens sometimes so don't bother reporting it.
if (ec && ec != boost::system::errc::not_connected) {
LOKI_LOG(error, "Socket shutdown failure [{}: {}]", ec.value(),
ec.message());
}
LOKI_LOG(error, "ec: {}. Message: {}", ec.value(), ec.message());
return;
socket_.close(ec);
if (ec) {
LOKI_LOG(error, "On close socket [{}: {}]", ec.value(), ec.message());
}
}

View file

@ -184,7 +184,11 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
// the message is stored here momentarily; needed because
// we can't pass it using current notification mechanism
boost::optional<message_t> message;
} notification_ctx_;
// Messenger public key that this connection is registered for
std::string pubkey;
};
boost::optional<notification_context_t> notification_ctx_;
public:
connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
@ -194,13 +198,13 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
~connection_t();
// Connection index, mainly used for debugging
uint64_t conn_idx;
/// Initiate the asynchronous operations associated with the connection.
void start();
void notify(const message_t& msg);
/// "Reset" the connection by sending an empty message list
void reset();
void notify(boost::optional<const message_t&> msg);
private:
void do_handshake();

View file

@ -22,8 +22,8 @@ void make_https_request(boost::asio::io_context& ioc,
#else
if (sn_address == "0.0.0.0") {
LOKI_LOG(error, "Could not initiate request to snode (we don't know "
"their IP yet).");
LOKI_LOG(warn, "Could not initiate request to snode (we don't know "
"their IP yet).");
return;
}
@ -201,8 +201,10 @@ void HttpsClientSession::start() {
if (ec) {
std::ostringstream os;
os << endpoint;
LOKI_LOG(error, "[https client]: could not connect to {}, message: {} ({})",
os.str(), ec.message(), ec.value());
LOKI_LOG(
error,
"[https client]: could not connect to {}, message: {} ({})",
os.str(), ec.message(), ec.value());
trigger_callback(SNodeError::NO_REACH, nullptr);
return;
}

View file

@ -368,9 +368,46 @@ void ServiceNode::send_sn_request(const std::shared_ptr<request_t>& req,
void ServiceNode::register_listener(const std::string& pk,
const std::shared_ptr<connection_t>& c) {
// NOTE: it is the responsibility of connection_t to deregister itself!
pk_to_listeners[pk].push_back(c);
LOKI_LOG(debug, "register pubkey: {}, total pubkeys: {}", pk,
LOKI_LOG(debug, "Register pubkey: {}, total pubkeys: {}", pk,
pk_to_listeners.size());
LOKI_LOG(debug, "Number of connections listening for {}: {}", pk,
pk_to_listeners[pk].size());
}
void ServiceNode::remove_listener(const std::string& pk,
const connection_t* const c) {
const auto it = pk_to_listeners.find(pk);
if (it == pk_to_listeners.end()) {
/// This will sometimes happen because we reset all listeners on
/// push_all
LOKI_LOG(debug, "Trying to remove an unknown pk from the notification "
"map. Operation ignored.");
} else {
LOKI_LOG(trace,
"Deregistering notification for connection {} for pk {}",
c->conn_idx, pk);
auto& cs = it->second;
const auto new_end = std::remove_if(
cs.begin(), cs.end(), [c](const std::shared_ptr<connection_t>& e) {
return e.get() == c;
});
const auto count = std::distance(new_end, cs.end());
cs.erase(new_end, cs.end());
if (count == 0) {
LOKI_LOG(debug, "Connection {} in not registered for pk {}",
c->conn_idx, pk);
} else if (count > 1) {
LOKI_LOG(debug,
"Multiple registrations ({}) for connection {} for pk {}",
count, c->conn_idx, pk);
}
}
}
void ServiceNode::notify_listeners(const std::string& pk,
@ -400,7 +437,8 @@ void ServiceNode::reset_listeners() {
/// simplicity
for (auto& entry : pk_to_listeners) {
for (auto& c : entry.second) {
c->reset();
/// notify with no messages
c->notify(boost::none);
}
}
@ -868,7 +906,7 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester,
members.push_back(our_address_);
if (members.size() < 2) {
LOKI_LOG(error, "Could not initiate peer test: swarm too small");
LOKI_LOG(warn, "Could not initiate peer test: swarm too small");
return false;
}

View file

@ -218,6 +218,9 @@ class ServiceNode {
void register_listener(const std::string& pk,
const connection_ptr& connection);
void remove_listener(const std::string& pk,
const http_server::connection_t* const connection);
// Notify listeners of a new message for pk
void notify_listeners(const std::string& pk, const message_t& msg);