Fix service node winner incorrect error

If we are re-deriving states we still also need the historical quorums
such that we can process incoming blocks with state changes. Without it,
state changes will be ignored and skipped causing inconsistent state in
the service node list.

This mandates a rescan and stores the most recent in state_ts, and only
just quorums for states preceeding 10k intervals. We can no longer store
the 1st most recent state in the DB as that will be missing quorums as
well for similar reason when rederiving on start-up.
This commit is contained in:
Doyle 2019-08-16 15:30:36 +10:00
parent e940614b69
commit 8fdf2a7819
4 changed files with 148 additions and 137 deletions

View File

@ -171,26 +171,21 @@ namespace service_nodes
return sort_and_filter(service_nodes_infos, [](const service_node_info &info) { return info.is_decommissioned() && info.is_fully_funded(); }, /*reserve=*/ false);
}
std::shared_ptr<const testing_quorum> service_node_list::get_testing_quorum(quorum_type type, uint64_t height, bool include_old) const
quorum_manager service_node_list::get_quorum_manager(uint64_t height, bool include_old) const
{
if (type == quorum_type::checkpointing) {
if (height < REORG_SAFETY_BUFFER_BLOCKS_POST_HF12)
return nullptr;
height -= REORG_SAFETY_BUFFER_BLOCKS_POST_HF12;
}
quorum_manager const *result_ptr = nullptr;
std::lock_guard<boost::recursive_mutex> lock(m_sn_mutex);
quorum_manager const *quorums = nullptr;
if (height == m_state.height)
quorums = &m_state.quorums;
result_ptr = &m_state.quorums;
else // NOTE: Search m_state_history
{
auto it = m_state_history.find(height);
if (it != m_state_history.end())
quorums = &it->quorums;
result_ptr = &it->quorums;
}
if (!quorums && include_old) // NOTE: Search m_old_quorum_states
if (!result_ptr && include_old) // NOTE: Search m_old_quorum_states
{
auto it =
std::lower_bound(m_old_quorum_states.begin(),
@ -199,34 +194,39 @@ namespace service_nodes
[](quorums_by_height const &entry, uint64_t height) { return entry.height < height; });
if (it != m_old_quorum_states.end() && it->height == height)
quorums = &it->quorums;
result_ptr = &it->quorums;
}
if (!quorums)
return nullptr;
quorum_manager empty_result = {};
if (!result_ptr) result_ptr = &empty_result;
return *result_ptr;
}
std::shared_ptr<const testing_quorum> service_node_list::get_testing_quorum(quorum_type type, uint64_t height, bool include_old) const
{
if (type == quorum_type::checkpointing) {
if (height < REORG_SAFETY_BUFFER_BLOCKS_POST_HF12)
return nullptr;
height -= REORG_SAFETY_BUFFER_BLOCKS_POST_HF12;
}
quorum_manager quorums = get_quorum_manager(height, include_old);
if (type == quorum_type::obligations)
return quorums->obligations;
return quorums.obligations;
else if (type == quorum_type::checkpointing)
return quorums->checkpointing;
return quorums.checkpointing;
MERROR("Developer error: Unhandled quorum enum with value: " << (size_t)type);
assert(!"Developer error: Unhandled quorum enum");
return nullptr;
}
bool service_node_list::get_quorum_pubkey(quorum_type type, quorum_group group, uint64_t height, size_t quorum_index, crypto::public_key &key) const
static bool get_quorum_pubkey_from_quorum(testing_quorum const &quorum, quorum_group group, size_t quorum_index, crypto::public_key &key)
{
std::shared_ptr<const testing_quorum> quorum = get_testing_quorum(type, height);
if (!quorum)
{
LOG_PRINT_L1("Quorum for height: " << height << ", was not stored by the daemon");
return false;
}
std::vector<crypto::public_key> const *array = nullptr;
if (group == quorum_group::validator) array = &quorum->validators;
else if (group == quorum_group::worker) array = &quorum->workers;
if (group == quorum_group::validator) array = &quorum.validators;
else if (group == quorum_group::worker) array = &quorum.workers;
else
{
MERROR("Invalid quorum group specified");
@ -243,6 +243,19 @@ namespace service_nodes
return true;
}
bool service_node_list::get_quorum_pubkey(quorum_type type, quorum_group group, uint64_t height, size_t quorum_index, crypto::public_key &key) const
{
std::shared_ptr<const testing_quorum> quorum = get_testing_quorum(type, height);
if (!quorum)
{
LOG_PRINT_L1("Quorum for height: " << height << ", was not stored by the daemon");
return false;
}
bool result = get_quorum_pubkey_from_quorum(*quorum, group, quorum_index, key);
return result;
}
std::vector<service_node_pubkey_info> service_node_list::get_service_node_list_state(const std::vector<crypto::public_key> &service_node_pubkeys) const
{
std::lock_guard<boost::recursive_mutex> lock(m_sn_mutex);
@ -407,7 +420,8 @@ namespace service_nodes
}
crypto::public_key key;
if (!get_quorum_pubkey(quorum_type::obligations, quorum_group::worker, state_change.block_height, state_change.service_node_index, key))
quorum_manager quorums = get_quorum_manager(state_change.block_height);
if (!quorums.obligations || !get_quorum_pubkey_from_quorum(*quorums.obligations, quorum_group::worker, state_change.service_node_index, key))
return false;
auto iter = m_state.service_nodes_infos.find(key);
@ -1142,13 +1156,30 @@ namespace service_nodes
{
uint64_t cull_height = (block_height < MAX_SHORT_TERM_STATE_HISTORY) ? 0 : block_height - MAX_SHORT_TERM_STATE_HISTORY;
auto it = m_state_history.find(cull_height);
// TODO(loki): Don't keep state migrated from serialized v4.0.3s since they are incomplete. Remove after everyone has upgraded
if (it != m_state_history.end() && (it->height % STORE_LONG_TERM_STATE_INTERVAL != 0 || it->is_migrated_from_v403()))
if (it != m_state_history.end())
{
if (m_store_quorum_history)
m_old_quorum_states.emplace_back(it->height, it->quorums);
it = m_state_history.erase(it);
uint64_t next_long_term_state = ((it->height / STORE_LONG_TERM_STATE_INTERVAL) + 1) * STORE_LONG_TERM_STATE_INTERVAL;
uint64_t dist_to_next_long_term_state = next_long_term_state - it->height;
bool need_quorum_for_future_states = (dist_to_next_long_term_state <= VOTE_LIFETIME + VOTE_OR_TX_VERIFY_HEIGHT_BUFFER);
if (it->height % STORE_LONG_TERM_STATE_INTERVAL == 0)
{
// Preserve everything
}
else if (need_quorum_for_future_states)
{
// Preserve just quorum
state_t &state = const_cast<state_t &>(*it); // safe: set order only depends on state_t.height
state.service_nodes_infos = {};
state.key_image_blacklist = {};
state.only_loaded_quorums = true;
}
else
{
if (m_store_quorum_history)
m_old_quorum_states.emplace_back(it->height, it->quorums);
it = m_state_history.erase(it);
}
}
if (m_old_quorum_states.size() > m_store_quorum_history)
@ -1269,6 +1300,8 @@ namespace service_nodes
update_swarms(block_height);
}
uint64_t constexpr MIN_DERIVABLE_SHORT_TERM_STATE_OFFSET =
MAX_SHORT_TERM_STATE_HISTORY - VOTE_LIFETIME - VOTE_OR_TX_VERIFY_HEIGHT_BUFFER;
void service_node_list::blockchain_detached(uint64_t height)
{
std::lock_guard<boost::recursive_mutex> lock(m_sn_mutex);
@ -1276,35 +1309,41 @@ namespace service_nodes
if (m_state.height == height)
return;
auto it = m_state_history.lower_bound(height);
// NOTE: The first (VOTE_LIFETIME + VOTE_OR_TX_VERIFY_HEIGHT_BUFFER) of the
// recent states we store, don't have quorum information preceeding it. So
// if we pop to within that range, we can't restore the service node list to
// its original state since we might not be able to verify state changes
// relying on
// quorums from before it.
bool reinitialise = false;
if (it == m_state_history.end())
reinitialise = true;
else
bool reinitialise = (height < STORE_LONG_TERM_STATE_INTERVAL || height < MIN_DERIVABLE_SHORT_TERM_STATE_OFFSET);
if (!reinitialise)
{
m_state_history.erase(it, m_state_history.end());
// TODO(loki): If historical state is serialized from v4.0.3 they are incomplete, need a full rescan. Delete code block after everyone has upgraded
if (m_state_history.size()) {
auto &latest = *m_state_history.rbegin();
reinitialise = (latest.is_migrated_from_v403() || latest.height > height);
uint64_t height_delta = m_state.height - height;
if (height_delta <= MIN_DERIVABLE_SHORT_TERM_STATE_OFFSET) // Detaching to recent state, which we store all necessary information for
{
auto it = m_state_history.find(height);
if (it == m_state_history.end()) reinitialise = true;
else m_state_history.erase(it, m_state_history.end());
}
else // Detach large amount, we only store complete state every STORE_LONG_TERM_STATE_INTERVAL blocks.
{
uint64_t prev_interval = height - (height % STORE_LONG_TERM_STATE_INTERVAL);
auto it = m_state_history.upper_bound(prev_interval);
if (it == m_state_history.end()) reinitialise = true;
else m_state_history.erase(it, m_state_history.end());
}
else
reinitialise = true;
}
if (reinitialise)
// NOTE: only_loaded_quorums should not be true here should be handled above, but sanity check anyway
auto it = std::prev(m_state_history.end());
if (m_state_history.empty() || reinitialise || it->only_loaded_quorums)
{
// TODO(loki): If historical state is serialized from v4.0.3 they are incomplete, need a full rescan. Delete code block after everyone has upgraded
if (m_state_history.size() && m_state_history.rbegin()->is_migrated_from_v403())
reset(true);
m_state_history.clear();
init();
return;
}
it = std::prev(m_state_history.end());
m_state = std::move(*it);
m_state_history.erase(it);
@ -1522,7 +1561,6 @@ namespace service_nodes
static service_node_list::quorum_for_serialization serialize_quorum_state(uint8_t hf_version, uint64_t height, quorum_manager const &quorums)
{
service_node_list::quorum_for_serialization result = {};
result.version = get_min_service_node_info_version_for_hf(hf_version);
result.height = height;
if (quorums.obligations) result.quorums[static_cast<uint8_t>(quorum_type::obligations)] = *quorums.obligations;
if (quorums.checkpointing) result.quorums[static_cast<uint8_t>(quorum_type::checkpointing)] = *quorums.checkpointing;
@ -1532,7 +1570,7 @@ namespace service_nodes
static service_node_list::state_serialized serialize_service_node_state_object(uint8_t hf_version, service_node_list::state_t const &state)
{
service_node_list::state_serialized result = {};
result.version = get_min_service_node_info_version_for_hf(hf_version);
result.version = service_node_list::state_serialized::get_version(hf_version);
result.infos.reserve(state.service_nodes_infos.size());
for (const auto &kv_pair : state.service_nodes_infos)
@ -1541,6 +1579,7 @@ namespace service_nodes
result.key_image_blacklist = state.key_image_blacklist;
result.height = state.height;
result.quorums = serialize_quorum_state(hf_version, state.height, state.quorums);
result.only_stored_quorums = state.only_loaded_quorums;
return result;
}
@ -1555,41 +1594,23 @@ namespace service_nodes
static data_for_serialization data = {}; // NOTE: Static to avoid constant reallocation
data.clear();
data.version = get_min_service_node_info_version_for_hf(hf_version);
data.version = data_for_serialization::get_version(hf_version);
{
std::lock_guard<boost::recursive_mutex> lock(m_sn_mutex);
data.quorum_states.reserve(m_old_quorum_states.size());
for (const quorums_by_height &entry : m_old_quorum_states)
data.quorum_states.push_back(serialize_quorum_state(hf_version, entry.height, entry.quorums));
if (m_state_history.size() == 1)
size_t num_to_reserve = 1; // recent state
if (m_state_history.size() > MAX_SHORT_TERM_STATE_HISTORY)
num_to_reserve += m_state_history.size() - (MAX_SHORT_TERM_STATE_HISTORY - 1);
data.states.reserve(num_to_reserve);
uint64_t const min_height = m_state.height - MIN_DERIVABLE_SHORT_TERM_STATE_OFFSET;
for (auto it : m_state_history)
{
data.states.push_back(serialize_service_node_state_object(hf_version, *m_state_history.begin()));
}
else if (m_state_history.size() >= 2)
{
size_t num_to_reserve = 1; // recent state
if (m_state_history.size() > MAX_SHORT_TERM_STATE_HISTORY)
// + 10k historic blocks (-1 because the oldest short-term history might also be historic)
num_to_reserve += m_state_history.size() - (MAX_SHORT_TERM_STATE_HISTORY - 1);
data.states.reserve(num_to_reserve);
for (auto it = m_state_history.cbegin(), nextit = std::next(it); nextit != m_state_history.cend(); it = nextit++)
{
state_t const &curr = *it, &next = *nextit;
data.states.push_back(serialize_service_node_state_object(hf_version, curr));
uint64_t height_delta = next.height - curr.height;
if (height_delta != STORE_LONG_TERM_STATE_INTERVAL)
{
// TODO(loki): Preserve the quorum state from v403 until they all get purged from storage then we can remove that check.
if (next.is_migrated_from_v403() && next.height != m_state_history.rbegin()->height)
continue;
data.states.push_back(serialize_service_node_state_object(hf_version, next));
break;
}
}
if (it.height > min_height) break;
data.states.push_back(serialize_service_node_state_object(hf_version, it));
}
}
@ -1785,7 +1806,9 @@ namespace service_nodes
}
service_node_list::state_t::state_t(state_serialized &&state)
: height{state.height}, key_image_blacklist{std::move(state.key_image_blacklist)}
: height{state.height}
, key_image_blacklist{std::move(state.key_image_blacklist)}
, only_loaded_quorums{state.only_stored_quorums}
{
for (auto &pubkey_info : state.infos)
service_nodes_infos.emplace(std::move(pubkey_info.pubkey), std::move(pubkey_info.info));
@ -1828,31 +1851,14 @@ namespace service_nodes
CHECK_AND_ASSERT_MES(old_data || new_data, false, "Failed to parse service node data from blob");
if (old_data)
{
new_data_in = {};
new_data_in.states.reserve(old_data_in.quorum_states.size() + 1);
for (quorum_for_serialization &entry : old_data_in.quorum_states)
{
if (entry.height == old_data_in.height)
continue;
new_data_in.states.emplace_back();
state_serialized &new_state = new_data_in.states.back();
new_state.height = entry.height;
new_state.quorums = std::move(entry);
}
new_data_in.states.emplace_back();
state_serialized &new_state = new_data_in.states.back();
new_data_in.version = old_data_in.version;
new_state.height = old_data_in.height;
new_state.infos = std::move(old_data_in.infos);
new_state.key_image_blacklist = std::move(old_data_in.key_image_blacklist);
}
return false;
}
if (new_data_in.states.empty())
if (new_data_in.states.empty() ||
new_data_in.states[0].version <= state_serialized::version_t::version_0_v404_missing_state_changes)
{
return false;
}
{
const uint64_t hist_state_from_height = current_height - m_store_quorum_history;

View File

@ -239,6 +239,7 @@ namespace service_nodes
/// do no subtract off the buffer in advance)
/// return: nullptr if the quorum is not cached in memory (pruned from memory).
std::shared_ptr<const testing_quorum> get_testing_quorum(quorum_type type, uint64_t height, bool include_old = false) const;
quorum_manager get_quorum_manager(uint64_t height, bool include_old = false) const;
bool get_quorum_pubkey(quorum_type type, quorum_group group, uint64_t height, size_t quorum_index, crypto::public_key &key) const;
std::vector<service_node_pubkey_info> get_service_node_list_state(const std::vector<crypto::public_key> &service_node_pubkeys) const;
@ -380,30 +381,48 @@ namespace service_nodes
struct state_serialized
{
uint8_t version;
enum struct version_t
{
version_0_v404_missing_state_changes,
version_1_required_quorums_for_skipped_state_changes,
count,
};
static version_t get_version(uint8_t /*hf_version*/) { return version_t::version_1_required_quorums_for_skipped_state_changes; }
version_t version;
uint64_t height;
std::vector<service_node_pubkey_info> infos;
std::vector<key_image_blacklist_entry> key_image_blacklist;
quorum_for_serialization quorums;
bool only_stored_quorums;
BEGIN_SERIALIZE()
VARINT_FIELD(version)
ENUM_FIELD(version, version < version_t::count)
VARINT_FIELD(height)
FIELD(infos)
FIELD(key_image_blacklist)
FIELD(quorums)
FIELD(only_stored_quorums)
END_SERIALIZE()
};
struct data_for_serialization
{
uint8_t version;
enum struct version_t
{
version_0_v404_missing_state_changes,
version_1_store_historic_states_separate,
count,
};
static version_t get_version(uint8_t /*hf_version*/) { return version_t::version_1_store_historic_states_separate; }
version_t version;
std::vector<quorum_for_serialization> quorum_states;
std::vector<state_serialized> states;
void clear() { quorum_states.clear(); states.clear(); version = 0; }
void clear() { quorum_states.clear(); states.clear(); version = {}; }
BEGIN_SERIALIZE()
VARINT_FIELD(version)
ENUM_FIELD(version, version < version_t::count)
FIELD(quorum_states)
FIELD(states)
END_SERIALIZE()
@ -412,25 +431,11 @@ namespace service_nodes
using block_height = uint64_t;
struct state_t
{
// TODO(loki): Remove after we HF13 and can be sure that everyone has
// upgraded to the new blob serialization style where we store quorums
// with state_t instead of individually.
// So do this hackily if there are no service nodes infos stored then we
// presume it's been migrated from v4.0.3 blobs which only preserved
// historical quorums. Deriving this without a new variable means we don't
// have to serialise new data between saves and loads and reduces the
// temporary code we need to manage to get this working satisfactorily.
// These state_t's will only have loaded data about quorums and height.
// Everything else is missing, so if something tries to access the info or
// blacklist then we know we have to trigger a rescan.
bool is_migrated_from_v403() const { return service_nodes_infos.empty(); }
service_nodes_infos_t service_nodes_infos;
bool only_loaded_quorums;
service_nodes_infos_t service_nodes_infos;
std::vector<key_image_blacklist_entry> key_image_blacklist;
block_height height{0};
mutable quorum_manager quorums; // Mutable because we are allowed to (and need to) change it via std::set iterator
mutable quorum_manager quorums; // Mutable because we are allowed to (and need to) change it via std::set iterator
state_t() = default;
state_t(block_height height) : height{height} {}
@ -482,9 +487,9 @@ namespace service_nodes
constexpr bool operator()(const state_t &lhs, const state_t &rhs) const { return lhs.height < rhs.height; }
};
std::deque<quorums_by_height> m_old_quorum_states; // Store all old quorum history only if run with --store-full-quorum-history
std::set<state_t, state_t_less> m_state_history;
state_t m_state;
std::deque<quorums_by_height> m_old_quorum_states; // Store all old quorum history only if run with --store-full-quorum-history
std::set<state_t, state_t_less> m_state_history;
state_t m_state;
};
bool reg_tx_extract_fields(const cryptonote::transaction& tx, std::vector<cryptonote::account_public_address>& addresses, uint64_t& portions_for_operator, std::vector<uint64_t>& portions, uint64_t& expiration_timestamp, crypto::public_key& service_node_key, crypto::signature& signature, crypto::public_key& tx_pub_key);

View File

@ -92,6 +92,11 @@ namespace service_nodes {
constexpr uint64_t STATE_CHANGE_TX_LIFETIME_IN_BLOCKS = VOTE_LIFETIME;
// If we get an incoming vote of state change tx that is outside the acceptable range by this many
// blocks then ignore it but don't trigger a connection drop; the sending side could be a couple
// blocks out of sync and sending something that it thinks is legit.
constexpr uint64_t VOTE_OR_TX_VERIFY_HEIGHT_BUFFER = 5;
using swarm_id_t = uint64_t;
constexpr swarm_id_t UNASSIGNED_SWARM_ID = UINT64_MAX;

View File

@ -131,11 +131,6 @@ namespace service_nodes
return true;
}
// If we get an incoming vote of state change tx that is outside the acceptable range by this many
// blocks then ignore it but don't trigger a connection drop; the sending side could be a couple
// blocks out of sync and sending something that it thinks is legit.
constexpr uint64_t VERIFY_HEIGHT_BUFFER = 5;
static bool bad_tx(cryptonote::tx_verification_context &tvc) {
tvc.m_verifivation_failed = true;
return false;
@ -185,7 +180,7 @@ namespace service_nodes
<< ", is newer than current height: " << latest_height
<< " blocks and has been rejected.");
vvc.m_invalid_block_height = true;
if (state_change.block_height >= latest_height + VERIFY_HEIGHT_BUFFER)
if (state_change.block_height >= latest_height + VOTE_OR_TX_VERIFY_HEIGHT_BUFFER)
tvc.m_verifivation_failed = true;
return false;
}
@ -198,7 +193,7 @@ namespace service_nodes
<< " (current height: " << latest_height << ") "
<< "blocks and has been rejected.");
vvc.m_invalid_block_height = true;
if (latest_height >= state_change.block_height + (service_nodes::STATE_CHANGE_TX_LIFETIME_IN_BLOCKS + VERIFY_HEIGHT_BUFFER))
if (latest_height >= state_change.block_height + (service_nodes::STATE_CHANGE_TX_LIFETIME_IN_BLOCKS + VOTE_OR_TX_VERIFY_HEIGHT_BUFFER))
tvc.m_verifivation_failed = true;
return false;
}
@ -309,14 +304,14 @@ namespace service_nodes
bool height_in_buffer = false;
if (latest_height > vote.block_height + VOTE_LIFETIME)
{
height_in_buffer = latest_height <= vote.block_height + (VOTE_LIFETIME + VERIFY_HEIGHT_BUFFER);
height_in_buffer = latest_height <= vote.block_height + (VOTE_LIFETIME + VOTE_OR_TX_VERIFY_HEIGHT_BUFFER);
LOG_PRINT_L1("Received vote for height: " << vote.block_height << ", is older than: " << VOTE_LIFETIME
<< " blocks and has been rejected.");
vvc.m_invalid_block_height = true;
}
else if (vote.block_height > latest_height)
{
height_in_buffer = vote.block_height <= latest_height + VERIFY_HEIGHT_BUFFER;
height_in_buffer = vote.block_height <= latest_height + VOTE_OR_TX_VERIFY_HEIGHT_BUFFER;
LOG_PRINT_L1("Received vote for height: " << vote.block_height << ", is newer than: " << latest_height
<< " (latest block height) and has been rejected.");
vvc.m_invalid_block_height = true;