Pulse: fix off by 2 quorum gen bug, recategories log msgs

This commit is contained in:
Doyle 2020-08-14 17:07:01 +10:00
parent d232e2c2b4
commit 9213df406c
3 changed files with 42 additions and 33 deletions

View file

@ -295,7 +295,7 @@ std::string msg_source_string(round_context const &context, pulse::message const
if (msg.quorum_position < context.prepare_for_round.quorum.validators.size())
{
crypto::public_key const &key = context.prepare_for_round.quorum.validators[msg.quorum_position];
stream << ":" << lokimq::to_hex(tools::view_guts(key));
stream << ":" << key;
}
}
@ -413,8 +413,8 @@ bool enforce_validator_participation_and_timeouts(round_context const &context,
if (timed_out && !all_received)
{
MDEBUG(log_prefix(context) << "We timed out and there were insufficient hashes, required "
<< service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << ", received "
<< stage.msgs_received << " from " << bitset_view16(stage.bitset));
<< service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << ", received " << stage.msgs_received
<< " from " << bitset_view16(stage.bitset));
return false;
}
@ -479,7 +479,7 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
// TODO(doyle): We need to support potentially receiving messages from the future up to 1 round.
if (msg.round != context.prepare_for_round.round)
{
MERROR(log_prefix(context) << "Message received from a future round too early, " << msg_source_string(context, msg) << ", dropping the message.");
MTRACE(log_prefix(context) << "Message received from a future round too early, " << msg_source_string(context, msg) << ", dropping the message.");
return;
}
@ -500,7 +500,7 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
auto &[entry, queued] = stage->queue.buffer[msg.quorum_position];
if (queued == queueing_state::empty)
{
MINFO(log_prefix(context) << "Message received early " << msg_source_string(context, msg) << ", queueing until we're ready.");
MTRACE(log_prefix(context) << "Message received early " << msg_source_string(context, msg) << ", queueing until we're ready.");
stage->queue.count++;
entry = std::move(msg);
queued = queueing_state::received;
@ -518,14 +518,14 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
if ((validator_bit & context.transient.wait_for_handshake_bitsets.best_bitset) == 0)
{
auto bitset_view = bitset_view16(context.transient.wait_for_handshake_bitsets.best_bitset);
MINFO(log_prefix(context) << "Dropping " << msg_source_string(context, msg) << ". Not a locked in participant, bitset is " << bitset_view);
MTRACE(log_prefix(context) << "Dropping " << msg_source_string(context, msg) << ". Not a locked in participant, bitset is " << bitset_view);
return;
}
}
if (msg.quorum_position >= service_nodes::PULSE_QUORUM_NUM_VALIDATORS)
{
MINFO(log_prefix(context) << "Dropping " << msg_source_string(context, msg) << ". Message quorum position indexes oob");
MTRACE(log_prefix(context) << "Dropping " << msg_source_string(context, msg) << ". Message quorum position indexes oob");
return;
}
@ -543,8 +543,9 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
auto &quorum = context.transient.send_and_wait_for_handshakes.data;
if (quorum[msg.quorum_position]) return;
quorum[msg.quorum_position] = true;
MINFO(log_prefix(context) << "Received handshake with quorum position bit (" << msg.quorum_position << ") "
<< bitset_view16(validator_bit) << " saved to bitset " << bitset_view16(stage->bitset));
MTRACE(log_prefix(context) << "Received handshake with quorum position bit (" << msg.quorum_position << ") "
<< bitset_view16(validator_bit) << " saved to bitset "
<< bitset_view16(stage->bitset));
}
break;
@ -566,14 +567,14 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
cryptonote::block block = {};
if (!cryptonote::t_serializable_object_from_blob(block, msg.block_template.blob))
{
MINFO(log_prefix(context) << "Received unparsable pulse block template blob");
MTRACE(log_prefix(context) << "Received unparsable pulse block template blob");
return;
}
if (block.pulse.round != context.prepare_for_round.round)
{
MINFO(log_prefix(context) << "Received pulse block template specifying different round " << +block.pulse.round
<< ", expected " << +context.prepare_for_round.round);
MTRACE(log_prefix(context) << "Received pulse block template specifying different round " << +block.pulse.round
<< ", expected " << +context.prepare_for_round.round);
return;
}
@ -581,7 +582,7 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
{
auto block_bitset = bitset_view16(block.pulse.validator_bitset);
auto our_bitset = bitset_view16(context.transient.wait_for_handshake_bitsets.best_bitset);
MINFO(log_prefix(context) << "Received pulse block template specifying different validator handshake bitsets " << block_bitset << ", expected " << our_bitset);
MTRACE(log_prefix(context) << "Received pulse block template specifying different validator handshake bitsets " << block_bitset << ", expected " << our_bitset);
return;
}
@ -610,9 +611,9 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
auto derived = crypto::cn_fast_hash(msg.random_value.value.data, sizeof(msg.random_value.value.data));
if (derived != hash)
{
MINFO(log_prefix(context) << "Dropping " << msg_source_string(context, msg) << ". Rederived random value hash "
<< lokimq::to_hex(tools::view_guts(derived)) << " does not match original hash "
<< lokimq::to_hex(tools::view_guts(hash)));
MTRACE(log_prefix(context) << "Dropping " << msg_source_string(context, msg)
<< ". Rederived random value hash " << derived << " does not match original hash "
<< hash);
return;
}
}
@ -830,7 +831,7 @@ round_state wait_for_next_block(uint64_t hf16_height, round_context &context, cr
if (context.wait_for_next_block.height == curr_height)
{
for (static uint64_t last_height = 0; last_height != curr_height; last_height = curr_height)
MINFO(log_prefix(context) << "Network is currently producing block " << curr_height << ", waiting until next block");
MDEBUG(log_prefix(context) << "Network is currently producing block " << curr_height << ", waiting until next block");
return round_state::wait_for_next_block;
}
@ -861,7 +862,7 @@ round_state wait_for_next_block(uint64_t hf16_height, round_context &context, cr
if (bool orphaned = false; !blockchain.get_block_by_hash(genesis_hash, genesis_block, &orphaned) || orphaned)
{
for (static bool once = true; once; once = !once)
MINFO(log_prefix(context) << "Failed to query the genesis block for Pulse at height " << hf16_height - 1);
MERROR(log_prefix(context) << "Failed to query the genesis block for Pulse at height " << hf16_height - 1);
return round_state::wait_for_next_block;
}
@ -869,7 +870,7 @@ round_state wait_for_next_block(uint64_t hf16_height, round_context &context, cr
// NOTE: Block Timing
//
uint64_t const delta_height = context.wait_for_next_block.height - cryptonote::get_block_height(genesis_block);
#if 0
#if 1
auto genesis_timestamp = pulse::time_point(std::chrono::seconds(genesis_block.timestamp));
pulse::time_point ideal_timestamp = genesis_timestamp + (TARGET_BLOCK_TIME * delta_height);
pulse::time_point prev_timestamp = pulse::time_point(std::chrono::seconds(top_block.timestamp));
@ -942,7 +943,7 @@ round_state prepare_for_round(round_context &context, service_nodes::service_nod
context.prepare_for_round.quorum =
service_nodes::generate_pulse_quorum(blockchain.nettype(),
blockchain.get_db(),
context.wait_for_next_block.height - 1,
context.wait_for_next_block.height + 1,
blockchain.get_service_node_list().get_block_leader().key,
blockchain.get_current_hard_fork_version(),
blockchain.get_service_node_list().active_service_nodes_infos(),
@ -982,7 +983,7 @@ round_state prepare_for_round(round_context &context, service_nodes::service_nod
if (context.prepare_for_round.participant == sn_type::none)
{
MINFO(log_prefix(context) << "We are not a pulse validator. Waiting for next pulse round or block.");
MDEBUG(log_prefix(context) << "We are not a pulse validator. Waiting for next pulse round or block.");
return goto_preparing_for_next_round(context);
}
@ -993,7 +994,7 @@ round_state wait_for_round(round_context &context, cryptonote::Blockchain const
{
if (context.wait_for_next_block.height != blockchain.get_current_blockchain_height(true /*lock*/))
{
MINFO(log_prefix(context) << "Block height changed whilst waiting for round " << +context.prepare_for_round.round << ", restarting Pulse stages");
MDEBUG(log_prefix(context) << "Block height changed whilst waiting for round " << +context.prepare_for_round.round << ", restarting Pulse stages");
return round_state::wait_for_next_block;
}
@ -1088,9 +1089,8 @@ round_state wait_for_handshake_bitsets(round_context &context, void *quorumnet_s
if (timed_out || all_bitsets)
{
bool missing_bitsets = timed_out && !all_bitsets;
MINFO(log_prefix(context)
<< "Collected " << stage.msgs_received << "/" << quorum.size() << " handshake bitsets"
<< (missing_bitsets ? ", we timed out and some bitsets were not seen!" : ""));
MDEBUG(log_prefix(context) << "Collected " << stage.msgs_received << "/" << quorum.size() << " handshake bitsets"
<< (missing_bitsets ? ", we timed out and some bitsets were not seen!" : ""));
std::map<uint16_t, int> most_common_bitset;
uint16_t best_bitset = 0;
@ -1105,7 +1105,7 @@ round_state wait_for_handshake_bitsets(round_context &context, void *quorumnet_s
count = num;
}
MINFO(log_prefix(context) << "Collected from V[" << quorum_index << "], handshake bitset " << bitset_view16(bitset));
MTRACE(log_prefix(context) << "Collected from V[" << quorum_index << "], handshake bitset " << bitset_view16(bitset));
}
if (count < service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES || best_bitset == 0)
@ -1114,11 +1114,11 @@ round_state wait_for_handshake_bitsets(round_context &context, void *quorumnet_s
// which validators are online, we wait until the next round.
if (best_bitset == 0)
{
MINFO(log_prefix(context) << count << "/" << quorum.size() << " validators did not send any handshake bitset or sent an empty handshake bitset");
MDEBUG(log_prefix(context) << count << "/" << quorum.size() << " validators did not send any handshake bitset or sent an empty handshake bitset");
}
else
{
MINFO(log_prefix(context) << "We heard back from less than " << service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << " of the validators ("
MDEBUG(log_prefix(context) << "We heard back from less than " << service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << " of the validators ("
<< count << "/" << quorum.size() << ", waiting for next round.");
}
@ -1151,14 +1151,14 @@ round_state send_block_template(round_context &context, void *quorumnet_state, s
// TODO(doyle): These checks can be done earlier?
if (list_state.empty())
{
MINFO(log_prefix(context) << "Block producer (us) is not available on the service node list, waiting until next round");
MWARNING(log_prefix(context) << "Block producer (us) is not available on the service node list, waiting until next round");
return goto_preparing_for_next_round(context);
}
std::shared_ptr<const service_nodes::service_node_info> info = list_state[0].info;
if (!info->is_active())
{
MINFO(log_prefix(context) << "Block producer (us) is not an active service node, waiting until next round");
MWARNING(log_prefix(context) << "Block producer (us) is not an active service node, waiting until next round");
return goto_preparing_for_next_round(context);
}
@ -1377,11 +1377,13 @@ round_state send_and_wait_for_signed_blocks(round_context &context, void *quorum
uint16_t validator_index = indices[index];
auto const &[signature, received] = quorum[validator_index];
assert(received);
MDEBUG(log_prefix(context) << "Signature added: " << validator_index << ":" << context.prepare_for_round.quorum.validators[validator_index] << ", " << signature);
final_block.signatures.emplace_back(validator_index, signature);
}
// Propagate Final Block
MINFO(log_prefix(context) << "Final signed block constructed\n" << cryptonote::obj_to_json_str(final_block));
MDEBUG(log_prefix(context) << "Final signed block constructed\n" << cryptonote::obj_to_json_str(final_block));
cryptonote::block_verification_context bvc = {};
if (!core.handle_block_found(final_block, bvc))
return goto_preparing_for_next_round(context);
@ -1411,7 +1413,7 @@ void pulse::main(void *quorumnet_state, cryptonote::core &core)
if (uint64_t height = blockchain.get_current_blockchain_height(true /*lock*/); height < hf16_height)
{
for (static bool once = true; once; once = !once)
MINFO("Pulse: Network at block " << height << " is not ready for Pulse until block " << hf16_height << ", waiting");
MDEBUG("Pulse: Network at block " << height << " is not ready for Pulse until block " << hf16_height << ", waiting");
return;
}

View file

@ -57,6 +57,13 @@ namespace service_nodes
END_SERIALIZE()
};
inline std::ostream &operator<<(std::ostream &os, quorum const &q)
{
for (size_t i = 0; i < q.validators.size(); i++) os << "V[" << i << "] " << q.validators[i] << "\n";
for (size_t i = 0; i < q.workers.size(); i++) os << "W[" << i << "] " << q.workers[i] << "\n";
return os;
}
struct quorum_manager
{
std::shared_ptr<const quorum> obligations;

View file

@ -332,7 +332,7 @@ namespace service_nodes
if (!crypto::check_signature(hash, key, quorum_signature.signature))
{
LOG_PRINT_L1("Incorrect signature for vote, failed verification at height: " << height << " for voter: " << epee::string_tools::pod_to_hex(key));
LOG_PRINT_L1("Incorrect signature for vote, failed verification at height: " << height << " for voter: " << key << "\n" << quorum);
return false;
}
}