Merge pull request #79 from msgmaxim/resend-on-failure

Retry pushing single messages on failure
This commit is contained in:
Maxim Shishmarev 2019-04-23 16:12:58 +10:00 committed by GitHub
commit e7aa6a00d0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 141 additions and 42 deletions

View file

@ -65,7 +65,8 @@ void make_http_request(boost::asio::io_context& ioc, std::string sn_address,
}
endpoint.port(port);
auto session = std::make_shared<HttpClientSession>(ioc, endpoint, req, cb);
auto session =
std::make_shared<HttpClientSession>(ioc, endpoint, req, std::move(cb));
session->start();
}
@ -797,7 +798,7 @@ void connection_t::register_deadline() {
/// TODO: make generic, avoid message copy
HttpClientSession::HttpClientSession(boost::asio::io_context& ioc,
const tcp::endpoint& ep,
const request_t& req, http_callback_t cb)
const request_t& req, http_callback_t&& cb)
: ioc_(ioc), socket_(ioc), endpoint_(ep), callback_(cb),
deadline_timer_(ioc) {
@ -825,6 +826,7 @@ void HttpClientSession::on_write(error_code ec, size_t bytes_transferred) {
if (ec) {
BOOST_LOG_TRIVIAL(error) << "Error on write, ec: " << ec.value()
<< ". Message: " << ec.message();
trigger_callback(SNodeError::ERROR_OTHER, nullptr);
return;
}
@ -854,11 +856,11 @@ void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) {
} else {
BOOST_LOG_TRIVIAL(error)
<< "Error on read: " << ec.value() << ". Message: " << ec.message();
trigger_callback(SNodeError::ERROR_OTHER, nullptr);
}
// Gracefully close the socket
socket_.shutdown(tcp::socket::shutdown_both, ec);
deadline_timer_.cancel();
// not_connected happens sometimes so don't bother reporting it.
if (ec && ec != boost::system::errc::not_connected) {
@ -868,43 +870,46 @@ void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) {
return;
}
init_callback(std::move(body));
trigger_callback(SNodeError::NO_ERROR, std::move(body));
// If we get here then the connection is closed gracefully
}
void HttpClientSession::start() {
auto self = shared_from_this();
socket_.async_connect(endpoint_, [=](const error_code& ec) {
/// TODO: I think I should just call again if ec == EINTR
if (ec) {
BOOST_LOG_TRIVIAL(error)
<< boost::format(
"Could not connect to %1%, message: %2% (%3%)") %
endpoint_ % ec.message() % ec.value();
callback_({SNodeError::NO_REACH, nullptr});
return;
}
socket_.async_connect(
endpoint_, [this, self = shared_from_this()](const error_code& ec) {
/// TODO: I think I should just call again if ec == EINTR
if (ec) {
BOOST_LOG_TRIVIAL(error)
<< boost::format(
"Could not connect to %1%, message: %2% (%3%)") %
endpoint_ % ec.message() % ec.value();
trigger_callback(SNodeError::NO_REACH, nullptr);
return;
}
self->on_connect();
});
self->on_connect();
});
deadline_timer_.expires_after(SESSION_TIME_LIMIT);
deadline_timer_.async_wait([self](const error_code& ec) {
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
log_error(ec);
deadline_timer_.async_wait(
[self = shared_from_this()](const error_code& ec) {
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
log_error(ec);
}
} else {
BOOST_LOG_TRIVIAL(error) << "client socket timed out";
self->socket_.close();
}
} else {
BOOST_LOG_TRIVIAL(error) << "client socket timed out";
self->socket_.close();
}
});
});
}
void HttpClientSession::init_callback(std::shared_ptr<std::string>&& body) {
ioc_.post(std::bind(callback_, sn_response_t{SNodeError::NO_ERROR, body}));
void HttpClientSession::trigger_callback(SNodeError error,
std::shared_ptr<std::string>&& body) {
ioc_.post(std::bind(callback_, sn_response_t{error, body}));
used_callback_ = true;
deadline_timer_.cancel();
}
/// We execute callback (if haven't already) here to make sure it is called

View file

@ -77,12 +77,13 @@ class HttpClientSession
void on_read(boost::system::error_code ec, std::size_t bytes_transferred);
void init_callback(std::shared_ptr<std::string>&& body);
void trigger_callback(SNodeError error,
std::shared_ptr<std::string>&& body);
public:
// Resolver and socket require an io_context
HttpClientSession(boost::asio::io_context& ioc, const tcp::endpoint& ep,
const request_t& req, http_callback_t cb);
const request_t& req, http_callback_t&& cb);
// initiate the client connection
void start();

View file

@ -26,6 +26,57 @@ using service_node::storage::Item;
namespace loki {
using http_server::connection_t;
constexpr std::array<std::chrono::seconds, 5> RETRY_INTERVALS = {
std::chrono::seconds(5), std::chrono::seconds(10), std::chrono::seconds(20),
std::chrono::seconds(40), std::chrono::seconds(80)};
FailedRequestHandler::FailedRequestHandler(boost::asio::io_context& ioc,
const sn_record_t& sn,
std::shared_ptr<request_t> req)
: ioc_(ioc), retry_timer_(ioc), sn_(sn), request_(std::move(req)) {}
void FailedRequestHandler::retry(std::shared_ptr<FailedRequestHandler>&& self) {
attempt_count_ += 1;
if (attempt_count_ > RETRY_INTERVALS.size()) {
BOOST_LOG_TRIVIAL(debug)
<< "Gave up after " << attempt_count_ << " attempts";
return;
}
retry_timer_.expires_after(RETRY_INTERVALS[attempt_count_ - 1]);
BOOST_LOG_TRIVIAL(debug)
<< "Will retry in " << RETRY_INTERVALS[attempt_count_ - 1].count()
<< " secs";
retry_timer_.async_wait(
[self = std::move(self)](const boost::system::error_code& ec) mutable {
/// Save some references before possibly moved out of `self`
const auto& sn = self->sn_;
auto& ioc = self->ioc_;
const auto& req = *self->request_;
/// Request will be copied here
make_http_request(
ioc, sn.address, sn.port, req,
[self = std::move(self)](sn_response_t&& res) mutable {
if (res.error_code != SNodeError::NO_ERROR) {
BOOST_LOG_TRIVIAL(error)
<< "Could not relay one: " << self->sn_
<< " (attempt #" << self->attempt_count_ << ")";
/// TODO: record failure here as well?
self->retry(std::move(self));
}
});
});
}
FailedRequestHandler::~FailedRequestHandler() {
BOOST_LOG_TRIVIAL(trace) << "~FailedRequestHandler()";
}
void FailedRequestHandler::init_timer() { retry(shared_from_this()); }
/// TODO: can we reuse context (reset it)?
std::string hash_data(std::string data) {
@ -85,20 +136,29 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc, uint16_t port,
ServiceNode::~ServiceNode() = default;
void ServiceNode::relay_one(const message_t& msg, sn_record_t sn) const {
void ServiceNode::relay_one(const std::shared_ptr<request_t>& req,
sn_record_t sn) const {
BOOST_LOG_TRIVIAL(debug) << "Relaying a message to " << sn;
request_t req;
serialize_message(req.body(), msg);
req.target("/v1/swarms/push");
BOOST_LOG_TRIVIAL(debug) << "Relaying a message to: " << sn;
// TODO: consider storing a shared_ptr inside http session instead of a copy
// (that might be annoying for requests that don't want to create a
// shared_ptr, like swarm updates)
make_http_request(
ioc_, sn.address, sn.port, req, [this, sn](sn_response_t&& res) {
ioc_, sn.address, sn.port, *req, [this, sn, req](sn_response_t&& res) {
if (res.error_code != SNodeError::NO_ERROR) {
BOOST_LOG_TRIVIAL(error) << "Could not relay one to: " << sn;
snode_report_[sn].relay_fails += 1;
if (res.error_code == SNodeError::NO_REACH) {
BOOST_LOG_TRIVIAL(error)
<< "Could not relay one to: " << sn << " (Unreachable)";
} else if (res.error_code == SNodeError::ERROR_OTHER) {
BOOST_LOG_TRIVIAL(error) << "Could not relay one to: " << sn
<< " (Generic error)";
}
std::make_shared<FailedRequestHandler>(ioc_, sn, req)
->init_timer();
}
});
}
@ -173,9 +233,13 @@ void ServiceNode::push_message(const message_t& msg) {
BOOST_LOG_TRIVIAL(debug)
<< "push_message to " << others.size() << " other nodes";
auto req = std::make_shared<request_t>();
serialize_message(req->body(), msg);
req->target("/v1/swarms/push");
for (const auto& address : others) {
/// send a request asynchronously
relay_one(msg, address);
relay_one(req, address);
}
}
@ -207,7 +271,7 @@ void ServiceNode::save_if_new(const message_t& msg) {
if (db_->store(msg.hash, msg.pub_key, msg.data, msg.ttl, msg.timestamp,
msg.nonce)) {
notify_listeners(msg.pub_key, msg);
BOOST_LOG_TRIVIAL(trace) << "saved message: " << msg.data;
BOOST_LOG_TRIVIAL(debug) << "saved message: " << msg.data;
}
}

View file

@ -7,6 +7,7 @@
#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/beast/http.hpp>
#include <boost/optional.hpp>
#include "common.h"
@ -22,6 +23,9 @@ class Item;
} // namespace storage
} // namespace service_node
namespace http = boost::beast::http;
using request_t = http::request<http::string_body>;
namespace loki {
namespace http_server {
@ -38,6 +42,30 @@ struct snode_stats_t {
uint64_t relay_fails = 0;
};
/// Represents failed attempt at communicating with a SNode
/// (currently only for single messages)
class FailedRequestHandler
: public std::enable_shared_from_this<FailedRequestHandler> {
boost::asio::io_context& ioc_;
boost::asio::steady_timer retry_timer_;
sn_record_t sn_;
const std::shared_ptr<request_t> request_;
uint32_t attempt_count_ = 0;
void retry(std::shared_ptr<FailedRequestHandler>&& self);
public:
FailedRequestHandler(boost::asio::io_context& ioc, const sn_record_t& sn,
std::shared_ptr<request_t> req);
~FailedRequestHandler();
/// Initiates the timer for retrying (which cannot be done directly in
/// the constructor as it is not possible to create a shared ptr
/// to itself before the construction is done)
void init_timer();
};
/// All service node logic that is not network-specific
class ServiceNode {
using pub_key_t = std::string;
@ -78,7 +106,8 @@ class ServiceNode {
void salvage_data() const;
/// used on push and on swarm bootstrapping
void relay_one(const message_t& msg, sn_record_t address) const;
void relay_one(const std::shared_ptr<request_t>& req,
sn_record_t address) const;
/// used for SN bootstrapping
void relay_batch(const std::string& data, sn_record_t address) const;