Derive tester/testee based on older (more stable) block

This commit is contained in:
Maxim Shishmarev 2019-09-03 16:45:04 +10:00
parent 56ddc9240d
commit a0f152018f
4 changed files with 97 additions and 37 deletions

View file

@ -342,15 +342,20 @@ bool connection_t::verify_signature(const std::string& signature,
} }
void connection_t::process_storage_test_req(uint64_t height, void connection_t::process_storage_test_req(uint64_t height,
const std::string& tester_addr, const std::string& tester_pk,
const std::string& msg_hash) { const std::string& msg_hash) {
LOKI_LOG(trace, "Performing storage test, attempt: {}", repetition_count_); LOKI_LOG(trace, "Performing storage test, attempt: {}", repetition_count_);
std::string answer; std::string answer;
/// TODO: we never actually test that `height` is within any reasonable
/// time window (or that it is not repeated multiple times), we should do that!
/// This is done implicitly to some degree using `block_hashes_cache_`, which
/// holds a limited number of recent blocks only and fails if an earlier block
/// is requested
const MessageTestStatus status = service_node_.process_storage_test_req( const MessageTestStatus status = service_node_.process_storage_test_req(
height, tester_addr, msg_hash, answer); height, tester_pk, msg_hash, answer);
const auto elapsed_time = const auto elapsed_time =
std::chrono::steady_clock::now() - start_timestamp_; std::chrono::steady_clock::now() - start_timestamp_;
if (status == MessageTestStatus::SUCCESS) { if (status == MessageTestStatus::SUCCESS) {
@ -369,7 +374,7 @@ void connection_t::process_storage_test_req(uint64_t height,
repeat_timer_.expires_after(TEST_RETRY_PERIOD); repeat_timer_.expires_after(TEST_RETRY_PERIOD);
repeat_timer_.async_wait([self = shared_from_this(), height, msg_hash, repeat_timer_.async_wait([self = shared_from_this(), height, msg_hash,
tester_addr](const error_code& ec) { tester_pk](const error_code& ec) {
if (ec) { if (ec) {
if (ec != boost::asio::error::operation_aborted) { if (ec != boost::asio::error::operation_aborted) {
LOKI_LOG(error, LOKI_LOG(error,
@ -377,7 +382,7 @@ void connection_t::process_storage_test_req(uint64_t height,
ec.value(), ec.message()); ec.value(), ec.message());
} }
} else { } else {
self->process_storage_test_req(height, tester_addr, msg_hash); self->process_storage_test_req(height, tester_pk, msg_hash);
} }
}); });
@ -389,6 +394,29 @@ void connection_t::process_storage_test_req(uint64_t height,
} }
} }
void connection_t::process_blockchain_test_req(uint64_t,
const std::string& tester_pk,
bc_test_params_t params) {
// Note: `height` can be 0, which is the default value for old SS, allowed
// pre HF13
LOKI_LOG(debug, "Performing blockchain test");
auto callback = [this](blockchain_test_answer_t answer) {
this->response_.result(http::status::ok);
nlohmann::json json_res;
json_res["res_height"] = answer.res_height;
this->body_stream_ << json_res.dump();
this->write_response();
};
/// TODO: this should first check if tester/testee are correct! (use `height`)
service_node_.perform_blockchain_test(params, std::move(callback));
}
void connection_t::process_swarm_req(boost::string_view target) { void connection_t::process_swarm_req(boost::string_view target) {
#ifndef DISABLE_SNODE_SIGNATURE #ifndef DISABLE_SNODE_SIGNATURE
@ -434,8 +462,7 @@ void connection_t::process_swarm_req(boost::string_view target) {
const auto it = header_.find(LOKI_SENDER_SNODE_PUBKEY_HEADER); const auto it = header_.find(LOKI_SENDER_SNODE_PUBKEY_HEADER);
if (it != header_.end()) { if (it != header_.end()) {
std::string& tester_pk = it->second; const std::string& tester_pk = it->second;
tester_pk.append(".snode");
this->process_storage_test_req(blk_height, tester_pk, msg_hash); this->process_storage_test_req(blk_height, tester_pk, msg_hash);
} else { } else {
LOKI_LOG(debug, "Ignoring test request, no pubkey present"); LOKI_LOG(debug, "Ignoring test request, no pubkey present");
@ -455,28 +482,34 @@ void connection_t::process_swarm_req(boost::string_view target) {
bc_test_params_t params; bc_test_params_t params;
// Height that should be used to check derive tester/testee
uint64_t height = 0;
try { try {
params.max_height = body.at("max_height").get<uint64_t>(); params.max_height = body.at("max_height").get<uint64_t>();
params.seed = body.at("seed").get<uint64_t>(); params.seed = body.at("seed").get<uint64_t>();
if (body.find("height") != body.end()) {
height = body.at("height").get<uint64_t>();
} else {
LOKI_LOG(debug, "No tester height, defaulting to {}", height);
}
} catch (...) { } catch (...) {
response_.result(http::status::bad_request); response_.result(http::status::bad_request);
LOKI_LOG(debug, "Bad snode test request: missing fields in json"); LOKI_LOG(debug, "Bad snode test request: missing fields in json");
return; return;
} }
delay_response_ = true; /// TODO: only check pubkey field once (in validate snode req)
const auto it = header_.find(LOKI_SENDER_SNODE_PUBKEY_HEADER);
if (it != header_.end()) {
const std::string& tester_pk = it->second;
delay_response_ = true;
this->process_blockchain_test_req(height, tester_pk, params);
} else {
LOKI_LOG(debug, "Ignoring test request, no pubkey present");
}
auto callback = [this](blockchain_test_answer_t answer) {
this->response_.result(http::status::ok);
nlohmann::json json_res;
json_res["res_height"] = answer.res_height;
this->body_stream_ << json_res.dump();
this->write_response();
};
service_node_.perform_blockchain_test(params, callback);
} else if (target == "/swarms/ping_test/v1") { } else if (target == "/swarms/ping_test/v1") {
response_.result(http::status::ok); response_.result(http::status::ok);
} else if (target == "/swarms/push/v1") { } else if (target == "/swarms/push/v1") {

View file

@ -255,6 +255,10 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
const std::string& tester_addr, const std::string& tester_addr,
const std::string& msg_hash); const std::string& msg_hash);
void process_blockchain_test_req(uint64_t height,
const std::string& tester_pk,
bc_test_params_t params);
bool parse_header(const char* key); bool parse_header(const char* key);
template <typename... Args> template <typename... Args>

View file

@ -346,7 +346,7 @@ ServiceNode::~ServiceNode() {
void ServiceNode::relay_data_reliable(const std::shared_ptr<request_t>& req, void ServiceNode::relay_data_reliable(const std::shared_ptr<request_t>& req,
const sn_record_t& sn) const { const sn_record_t& sn) const {
LOKI_LOG(debug, "Relaying data to: {}", sn); LOKI_LOG(trace, "Relaying data to: {}", sn);
// Note: often one of the reason for failure here is that the node has just // Note: often one of the reason for failure here is that the node has just
// deregistered but our SN hasn't updated its swarm list yet. // deregistered but our SN hasn't updated its swarm list yet.
@ -466,7 +466,7 @@ void ServiceNode::push_message(const message_t& msg) {
const auto& others = swarm_->other_nodes(); const auto& others = swarm_->other_nodes();
LOKI_LOG(debug, "push_message to {} other nodes", others.size()); LOKI_LOG(trace, "push_message to {} other nodes", others.size());
std::string body; std::string body;
serialize_message(body, msg); serialize_message(body, msg);
@ -524,7 +524,7 @@ void ServiceNode::save_if_new(const message_t& msg) {
if (db_->store(msg.hash, msg.pub_key, msg.data, msg.ttl, msg.timestamp, if (db_->store(msg.hash, msg.pub_key, msg.data, msg.ttl, msg.timestamp,
msg.nonce)) { msg.nonce)) {
notify_listeners(msg.pub_key, msg); notify_listeners(msg.pub_key, msg);
LOKI_LOG(debug, "saved message: {}", msg.data); LOKI_LOG(trace, "saved message: {}", msg.data);
} }
} }
@ -846,12 +846,13 @@ void ServiceNode::attach_pubkey(std::shared_ptr<request_t>& request) const {
void abort_if_integration_test() { void abort_if_integration_test() {
#ifdef INTEGRATION_TEST #ifdef INTEGRATION_TEST
LOKI_LOG(error, "ABORT in integration test"); LOKI_LOG(critical, "ABORT in integration test");
abort(); abort();
#endif #endif
} }
void ServiceNode::send_storage_test_req(const sn_record_t& testee, void ServiceNode::send_storage_test_req(const sn_record_t& testee,
uint64_t test_height,
const Item& item) { const Item& item) {
auto callback = [testee, item, height = this->block_height_, auto callback = [testee, item, height = this->block_height_,
@ -891,7 +892,7 @@ void ServiceNode::send_storage_test_req(const sn_record_t& testee,
nlohmann::json json_body; nlohmann::json json_body;
json_body["height"] = block_height_; json_body["height"] = test_height;
json_body["hash"] = item.hash; json_body["hash"] = item.hash;
auto req = make_post_request("/swarms/storage_test/v1", json_body.dump()); auto req = make_post_request("/swarms/storage_test/v1", json_body.dump());
@ -909,12 +910,14 @@ void ServiceNode::send_storage_test_req(const sn_record_t& testee,
void ServiceNode::send_blockchain_test_req(const sn_record_t& testee, void ServiceNode::send_blockchain_test_req(const sn_record_t& testee,
bc_test_params_t params, bc_test_params_t params,
uint64_t test_height,
blockchain_test_answer_t answer) { blockchain_test_answer_t answer) {
nlohmann::json json_body; nlohmann::json json_body;
json_body["max_height"] = params.max_height; json_body["max_height"] = params.max_height;
json_body["seed"] = params.seed; json_body["seed"] = params.seed;
json_body["height"] = test_height;
auto req = auto req =
make_post_request("/swarms/blockchain_test/v1", json_body.dump()); make_post_request("/swarms/blockchain_test/v1", json_body.dump());
@ -989,7 +992,8 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester,
block_hash = block_hash_; block_hash = block_hash_;
} else if (blk_height < block_height_) { } else if (blk_height < block_height_) {
LOKI_LOG(debug, "got storage test request for an older block"); LOKI_LOG(debug, "got storage test request for an older block: {}/{}",
blk_height, block_height_);
const auto it = const auto it =
std::find_if(block_hashes_cache_.begin(), block_hashes_cache_.end(), std::find_if(block_hashes_cache_.begin(), block_hashes_cache_.end(),
@ -1033,7 +1037,7 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester,
} }
MessageTestStatus ServiceNode::process_storage_test_req( MessageTestStatus ServiceNode::process_storage_test_req(
uint64_t blk_height, const std::string& tester_addr, uint64_t blk_height, const std::string& tester_pk,
const std::string& msg_hash, std::string& answer) { const std::string& msg_hash, std::string& answer) {
// 1. Check height, retry if we are behind // 1. Check height, retry if we are behind
@ -1052,17 +1056,17 @@ MessageTestStatus ServiceNode::process_storage_test_req(
derive_tester_testee(blk_height, tester, testee); derive_tester_testee(blk_height, tester, testee);
if (testee != our_address_) { if (testee != our_address_) {
LOKI_LOG(debug, "We are NOT the testee for height: {}", blk_height); LOKI_LOG(error, "We are NOT the testee for height: {}", blk_height);
return MessageTestStatus::ERROR; return MessageTestStatus::ERROR;
} }
if (tester.sn_address() != tester_addr) { if (tester.pub_key() != tester_pk) {
LOKI_LOG(debug, "Wrong tester: {}, expected: {}", tester_addr, LOKI_LOG(debug, "Wrong tester: {}, expected: {}", tester_pk,
tester.sn_address()); tester.sn_address());
abort_if_integration_test(); abort_if_integration_test();
return MessageTestStatus::ERROR; return MessageTestStatus::ERROR;
} else { } else {
LOKI_LOG(trace, "Tester is valid: {}", tester_addr); LOKI_LOG(trace, "Tester is valid: {}", tester_pk);
} }
} }
@ -1110,11 +1114,26 @@ void ServiceNode::initiate_peer_test() {
// 1. Select the tester/testee pair // 1. Select the tester/testee pair
sn_record_t tester, testee; sn_record_t tester, testee;
if (!derive_tester_testee(block_height_, tester, testee)) {
/// We test based on the height a few blocks back to minimise discrepancies
/// between nodes (we could also use checkpoints, but that is still not
/// bulletproof: swarms are calculated based on the latest block, so they
/// might be still different and thus derive different pairs)
constexpr uint64_t TEST_BLOCKS_BUFFER = 4;
if (block_height_ < TEST_BLOCKS_BUFFER) {
LOKI_LOG(debug, "Height {} is too small, skipping all tests",
block_height_);
return; return;
} }
LOKI_LOG(trace, "For height {}; tester: {} testee: {}", block_height_, const uint64_t test_height = block_height_ - TEST_BLOCKS_BUFFER;
if (!derive_tester_testee(test_height, tester, testee)) {
return;
}
LOKI_LOG(trace, "For height {}; tester: {} testee: {}", test_height,
tester, testee); tester, testee);
if (tester != our_address_) { if (tester != our_address_) {
@ -1133,7 +1152,7 @@ void ServiceNode::initiate_peer_test() {
item.data); item.data);
// 2.2. Initiate testing request // 2.2. Initiate testing request
send_storage_test_req(testee, item); send_storage_test_req(testee, test_height, item);
} }
} }
@ -1148,7 +1167,7 @@ void ServiceNode::initiate_peer_test() {
constexpr uint64_t CHECKPOINT_DISTANCE = 4; constexpr uint64_t CHECKPOINT_DISTANCE = 4;
// We can be confident that blockchain data won't // We can be confident that blockchain data won't
// change if we go this many blocks back // change if we go this many blocks back
constexpr uint64_t SAFETY_BUFFER_BLOCKS = CHECKPOINT_DISTANCE * 2; constexpr uint64_t SAFETY_BUFFER_BLOCKS = CHECKPOINT_DISTANCE * 3;
if (block_height_ <= SAFETY_BUFFER_BLOCKS) { if (block_height_ <= SAFETY_BUFFER_BLOCKS) {
LOKI_LOG(debug, LOKI_LOG(debug,
@ -1162,11 +1181,14 @@ void ServiceNode::initiate_peer_test() {
const uint64_t rng_seed = std::chrono::high_resolution_clock::now() const uint64_t rng_seed = std::chrono::high_resolution_clock::now()
.time_since_epoch() .time_since_epoch()
.count(); .count();
// TODO: This is slow, fix it!
std::mt19937_64 mt(rng_seed); std::mt19937_64 mt(rng_seed);
params.seed = mt(); params.seed = mt();
auto callback = std::bind(&ServiceNode::send_blockchain_test_req, this, auto callback =
testee, params, std::placeholders::_1); std::bind(&ServiceNode::send_blockchain_test_req, this, testee,
params, test_height, std::placeholders::_1);
/// Compute your own answer, then initiate a test request /// Compute your own answer, then initiate a test request
perform_blockchain_test(params, callback); perform_blockchain_test(params, callback);

View file

@ -19,7 +19,7 @@
#include "stats.h" #include "stats.h"
#include "swarm.h" #include "swarm.h"
static constexpr size_t BLOCK_HASH_CACHE_SIZE = 20; static constexpr size_t BLOCK_HASH_CACHE_SIZE = 30;
static constexpr int STORAGE_SERVER_HARDFORK = 12; static constexpr int STORAGE_SERVER_HARDFORK = 12;
class Database; class Database;
@ -186,11 +186,12 @@ class ServiceNode {
sn_record_t& testee); sn_record_t& testee);
/// Send a request to a SN under test /// Send a request to a SN under test
void send_storage_test_req(const sn_record_t& testee, void send_storage_test_req(const sn_record_t& testee, uint64_t test_height,
const storage::Item& item); const storage::Item& item);
void send_blockchain_test_req(const sn_record_t& testee, void send_blockchain_test_req(const sn_record_t& testee,
bc_test_params_t params, bc_test_params_t params,
uint64_t test_height,
blockchain_test_answer_t answer); blockchain_test_answer_t answer);
/// From a peer /// From a peer