mirror of https://github.com/oxen-io/oxen-core.git
Pulse: Unify message receiving data into stage_quorum_data
This commit is contained in:
parent
ffbee907be
commit
c9a15f94d8
|
@ -78,6 +78,28 @@ enum struct sn_type
|
|||
validator,
|
||||
};
|
||||
|
||||
enum struct queueing_state
|
||||
{
|
||||
empty,
|
||||
received,
|
||||
processed,
|
||||
};
|
||||
|
||||
struct message_queue
|
||||
{
|
||||
std::array<std::pair<pulse::message, queueing_state>, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> buffer;
|
||||
size_t count;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct stage_quorum_data
|
||||
{
|
||||
std::array<std::pair<T, bool>, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> data;
|
||||
uint16_t bitset;
|
||||
uint16_t count;
|
||||
pulse::time_point end_time;
|
||||
};
|
||||
|
||||
struct round_context
|
||||
{
|
||||
struct
|
||||
|
@ -100,18 +122,17 @@ struct round_context
|
|||
|
||||
struct
|
||||
{
|
||||
uint16_t validator_bits;
|
||||
pulse::time_point start_time;
|
||||
pulse::time_point end_time;
|
||||
bool all_received() { return validator_bits == service_nodes::pulse_validator_bit_mask(); }
|
||||
message_queue queued_messages;
|
||||
stage_quorum_data<bool> quorum;
|
||||
pulse::time_point start_time;
|
||||
pulse::time_point end_time;
|
||||
} wait_for_handshakes;
|
||||
|
||||
struct
|
||||
{
|
||||
std::array<uint16_t, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> bitsets;
|
||||
int bitsets_count;
|
||||
pulse::time_point end_time;
|
||||
bool all_received() { return bitsets_count == service_nodes::PULSE_QUORUM_NUM_VALIDATORS; }
|
||||
message_queue queued_messages;
|
||||
stage_quorum_data<uint16_t> quorum;
|
||||
pulse::time_point end_time;
|
||||
} wait_for_handshake_bitsets;
|
||||
|
||||
struct
|
||||
|
@ -134,18 +155,16 @@ struct round_context
|
|||
|
||||
struct
|
||||
{
|
||||
std::array<std::pair<crypto::hash, bool>, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> hashes;
|
||||
uint16_t validator_bitset;
|
||||
uint16_t count;
|
||||
pulse::time_point end_time;
|
||||
message_queue queued_messages;
|
||||
stage_quorum_data<crypto::hash> quorum;
|
||||
pulse::time_point end_time;
|
||||
} wait_for_random_value_hashes;
|
||||
|
||||
struct
|
||||
{
|
||||
std::array<std::pair<cryptonote::pulse_random_value, bool>, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> values;
|
||||
uint16_t validator_bitset;
|
||||
uint16_t count;
|
||||
pulse::time_point end_time;
|
||||
message_queue queued_messages;
|
||||
stage_quorum_data<cryptonote::pulse_random_value> quorum;
|
||||
pulse::time_point end_time;
|
||||
} wait_for_random_value;
|
||||
|
||||
struct
|
||||
|
@ -155,11 +174,9 @@ struct round_context
|
|||
|
||||
struct
|
||||
{
|
||||
// TODO(doyle): DRY
|
||||
std::array<std::pair<crypto::signature, bool>, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> data;
|
||||
uint16_t validator_bitset;
|
||||
uint16_t count;
|
||||
pulse::time_point end_time;
|
||||
message_queue queued_messages;
|
||||
stage_quorum_data<crypto::signature> quorum;
|
||||
pulse::time_point end_time;
|
||||
} wait_for_signed_blocks;
|
||||
|
||||
round_state state;
|
||||
|
@ -322,19 +339,33 @@ void relay_validator_handshake_bit_or_bitset(round_context const &context, void
|
|||
{
|
||||
assert(context.prepare_for_round.participant == sn_type::validator);
|
||||
|
||||
// Message
|
||||
pulse::message msg = {};
|
||||
msg.quorum_position = context.prepare_for_round.my_quorum_position;
|
||||
|
||||
if (sending_bitset)
|
||||
{
|
||||
msg.type = pulse::message_type::handshake_bitset;
|
||||
msg.handshakes.validator_bitset = context.wait_for_handshakes.validator_bits;
|
||||
msg.type = pulse::message_type::handshake_bitset;
|
||||
|
||||
// Generate the bitset from our received handshakes.
|
||||
stage_quorum_data<bool> const &quorum = context.wait_for_handshakes.quorum;
|
||||
for (size_t quorum_index = 0; quorum_index < quorum.data.size(); quorum_index++)
|
||||
{
|
||||
bool received = quorum.data[quorum_index].second;
|
||||
if (received)
|
||||
msg.handshakes.validator_bitset |= (1 << quorum_index);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
msg.type = pulse::message_type::handshake;
|
||||
}
|
||||
|
||||
crypto::generate_signature(message_signature_hash(context, msg), key.pub, key.key, msg.signature);
|
||||
|
||||
// Add our own handshake/bitset
|
||||
handle_message(nullptr, msg);
|
||||
|
||||
// Send
|
||||
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.prepare_for_round.quorum, false /*block_producer*/);
|
||||
}
|
||||
|
||||
|
@ -368,12 +399,11 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
return;
|
||||
}
|
||||
|
||||
bool relay_message = false;
|
||||
switch(msg.type)
|
||||
{
|
||||
case pulse::message_type::invalid:
|
||||
assert("Invalid Code Path" != nullptr);
|
||||
break;
|
||||
return;
|
||||
|
||||
case pulse::message_type::handshake:
|
||||
{
|
||||
|
@ -382,16 +412,18 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
if (!msg_time_check(context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_handshakes.end_time))
|
||||
return;
|
||||
|
||||
uint16_t quorum_position_bit = 1 << msg.quorum_position;
|
||||
relay_message = ((context.wait_for_handshakes.validator_bits & quorum_position_bit) == 0);
|
||||
context.wait_for_handshakes.validator_bits |= quorum_position_bit;
|
||||
stage_quorum_data<bool> &quorum = context.wait_for_handshakes.quorum;
|
||||
auto &[unused_, received] = quorum.data[msg.quorum_position];
|
||||
if (received) return;
|
||||
|
||||
if (relay_message) // First time seen handshake
|
||||
{
|
||||
auto position_bitset = std::bitset<sizeof(quorum_position_bit) * 8>(quorum_position_bit);
|
||||
auto validator_bitset = std::bitset<sizeof(quorum_position_bit) * 8>(context.wait_for_handshakes.validator_bits);
|
||||
MINFO(log_prefix(context) << "Received handshake with quorum position bit (" << msg.quorum_position <<") " << position_bitset << " saved to bitset " << validator_bitset);
|
||||
}
|
||||
uint16_t const validator_bit = 1 << msg.quorum_position;
|
||||
auto const position_bitset = std::bitset<sizeof(validator_bit) * 8>(validator_bit);
|
||||
auto const validator_bitset = std::bitset<sizeof(validator_bit) * 8>(quorum.bitset);
|
||||
MINFO(log_prefix(context) << "Received handshake with quorum position bit (" << msg.quorum_position <<") " << position_bitset << " saved to bitset " << validator_bitset);
|
||||
|
||||
quorum.bitset |= validator_bit;
|
||||
quorum.count++;
|
||||
received = true;
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -400,13 +432,14 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
if (!msg_time_check(context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_handshake_bitsets.end_time))
|
||||
return;
|
||||
|
||||
uint16_t prev_bitset = context.wait_for_handshake_bitsets.bitsets[msg.quorum_position];
|
||||
if (context.prepare_for_round.participant == sn_type::validator)
|
||||
relay_message = (prev_bitset != msg.handshakes.validator_bitset);
|
||||
stage_quorum_data<uint16_t> &quorum = context.wait_for_handshake_bitsets.quorum;
|
||||
auto &[bitset, received] = quorum.data[msg.quorum_position];
|
||||
if (received) return;
|
||||
|
||||
context.wait_for_handshake_bitsets.bitsets[msg.quorum_position] = msg.handshakes.validator_bitset;
|
||||
if (prev_bitset == 0)
|
||||
context.wait_for_handshake_bitsets.bitsets_count++;
|
||||
quorum.bitset |= (1 << msg.quorum_position);
|
||||
quorum.count++;
|
||||
received = true;
|
||||
bitset = msg.handshakes.validator_bitset;
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -434,8 +467,6 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
|
||||
context.wait_for_block_template.received = true;
|
||||
context.wait_for_block_template.block = std::move(block);
|
||||
relay_message = true;
|
||||
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -443,8 +474,6 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
case pulse::message_type::random_value:
|
||||
{
|
||||
assert(context.prepare_for_round.participant == sn_type::validator);
|
||||
assert(msg.quorum_position < context.wait_for_random_value_hashes.hashes.size());
|
||||
|
||||
auto const end_time = msg.type == pulse::message_type::random_value_hash
|
||||
? context.wait_for_random_value_hashes.end_time
|
||||
: context.wait_for_random_value.end_time;
|
||||
|
@ -461,30 +490,54 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
|
||||
if (msg.type == pulse::message_type::random_value_hash)
|
||||
{
|
||||
auto &[hash, hash_received] = context.wait_for_random_value_hashes.hashes[msg.quorum_position];
|
||||
if (context.state != round_state::wait_for_random_value_hashes)
|
||||
{
|
||||
// Enqueue the message until we're ready.
|
||||
auto &[entry, queued] = context.wait_for_random_value_hashes.queued_messages.buffer[msg.quorum_position];
|
||||
if (queued == queueing_state::empty)
|
||||
{
|
||||
context.wait_for_random_value_hashes.queued_messages.count++;
|
||||
entry = msg;
|
||||
queued = queueing_state::received;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
stage_quorum_data<crypto::hash> &quorum = context.wait_for_random_value_hashes.quorum;
|
||||
assert(msg.quorum_position < quorum.data.size());
|
||||
|
||||
auto &[hash, hash_received] = quorum.data[msg.quorum_position];
|
||||
if (hash_received)
|
||||
return; // Already received their hash
|
||||
|
||||
context.wait_for_random_value_hashes.validator_bitset |= validator_bit;
|
||||
context.wait_for_random_value_hashes.count++;
|
||||
quorum.bitset |= validator_bit;
|
||||
quorum.count++;
|
||||
hash = msg.random_value_hash.hash;
|
||||
hash_received = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO(doyle): This can happen too early
|
||||
auto const &[hash, hash_received] = context.wait_for_random_value_hashes.hashes[msg.quorum_position];
|
||||
if (!hash_received)
|
||||
if (context.state != round_state::wait_for_random_value)
|
||||
{
|
||||
MINFO(log_prefix(context) << "Dropping " << message_source_string(context, msg) << ". Received random value but hash not received.");
|
||||
// Enqueue the message until we're ready.
|
||||
auto &[entry, queued] = context.wait_for_random_value.queued_messages.buffer[msg.quorum_position];
|
||||
if (queued == queueing_state::empty)
|
||||
{
|
||||
context.wait_for_random_value.queued_messages.count++;
|
||||
entry = msg;
|
||||
queued = queueing_state::received;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
auto &[random_value, random_value_received] = context.wait_for_random_value.values[msg.quorum_position];
|
||||
|
||||
stage_quorum_data<cryptonote::pulse_random_value> &quorum = context.wait_for_random_value.quorum;
|
||||
auto &[random_value, random_value_received] = quorum.data[msg.quorum_position];
|
||||
if (random_value_received)
|
||||
return; // Already received their random value
|
||||
|
||||
crypto::hash derived_hash = crypto::cn_fast_hash(msg.random_value.value.data, sizeof(msg.random_value.value.data));
|
||||
crypto::hash derived_hash = crypto::cn_fast_hash(msg.random_value.value.data, sizeof(msg.random_value.value.data));
|
||||
auto const &[hash, hash_received] = context.wait_for_random_value_hashes.quorum.data[msg.quorum_position];
|
||||
if (derived_hash != hash)
|
||||
{
|
||||
MINFO(log_prefix(context) << "Dropping " << message_source_string(context, msg) << ". Rederived random value hash "
|
||||
|
@ -494,13 +547,11 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
return;
|
||||
}
|
||||
|
||||
context.wait_for_random_value.validator_bitset |= validator_bit;
|
||||
context.wait_for_random_value.count++;
|
||||
quorum.bitset |= validator_bit;
|
||||
quorum.count++;
|
||||
random_value = msg.random_value.value;
|
||||
random_value_received = true;
|
||||
}
|
||||
|
||||
relay_message = true;
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -509,6 +560,26 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
if (!msg_time_check(context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_signed_blocks.end_time))
|
||||
return;
|
||||
|
||||
uint16_t validator_bit = 1 << msg.quorum_position;
|
||||
if ((validator_bit & context.wait_for_block_template.block.pulse.validator_bitset) == 0)
|
||||
{
|
||||
MDEBUG(log_prefix(context) << "Dropping " << message_source_string(context, msg) << ". Not a locked in participant.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (context.state != round_state::wait_for_signed_blocks)
|
||||
{
|
||||
// Enqueue the message until we're ready.
|
||||
auto &[entry, queued] = context.wait_for_signed_blocks.queued_messages.buffer[msg.quorum_position];
|
||||
if (queued == queueing_state::empty)
|
||||
{
|
||||
context.wait_for_signed_blocks.queued_messages.count++;
|
||||
entry = msg;
|
||||
queued = queueing_state::received;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Execute the delayed signature verification that relies on us being in
|
||||
// the final Pulse Stage.
|
||||
if (!message_signature_check(msg, context.prepare_for_round.quorum))
|
||||
|
@ -517,28 +588,21 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
|
|||
return;
|
||||
}
|
||||
|
||||
uint16_t validator_bit = 1 << msg.quorum_position;
|
||||
if ((validator_bit & context.wait_for_block_template.block.pulse.validator_bitset) == 0)
|
||||
{
|
||||
MDEBUG(log_prefix(context) << "Dropping " << message_source_string(context, msg) << ". Not a locked in participant.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Signature already verified in message_signature_check(...)
|
||||
auto &[signature, received] = context.wait_for_signed_blocks.data[msg.quorum_position];
|
||||
stage_quorum_data<crypto::signature> &quorum = context.wait_for_signed_blocks.quorum;
|
||||
auto &[signature, received] = quorum.data[msg.quorum_position];
|
||||
if (received)
|
||||
return;
|
||||
|
||||
context.wait_for_signed_blocks.validator_bitset |= validator_bit;
|
||||
context.wait_for_signed_blocks.count++;
|
||||
quorum.bitset |= validator_bit;
|
||||
quorum.count++;
|
||||
signature = msg.signature;
|
||||
received = true;
|
||||
relay_message = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (relay_message && quorumnet_state)
|
||||
if (quorumnet_state)
|
||||
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.prepare_for_round.quorum, context.prepare_for_round.participant == sn_type::producer);
|
||||
}
|
||||
|
||||
|
@ -924,9 +988,8 @@ event_loop submit_handshakes(round_context &context, void *quorumnet_state, serv
|
|||
assert(context.prepare_for_round.participant == sn_type::validator);
|
||||
try
|
||||
{
|
||||
context.wait_for_handshakes.validator_bits |= (1 << context.prepare_for_round.my_quorum_position); // Add myself
|
||||
relay_validator_handshake_bit_or_bitset(context, quorumnet_state, key, false /*sending_bitset*/);
|
||||
context.state = round_state::wait_for_handshakes;
|
||||
relay_validator_handshake_bit_or_bitset(context, quorumnet_state, key, false /*sending_bitset*/);
|
||||
}
|
||||
catch (std::exception const &e)
|
||||
{
|
||||
|
@ -939,25 +1002,18 @@ event_loop submit_handshakes(round_context &context, void *quorumnet_state, serv
|
|||
|
||||
event_loop wait_for_handshakes(round_context &context, void *quorumnet_state)
|
||||
{
|
||||
stage_quorum_data<bool> const &quorum = context.wait_for_handshakes.quorum;
|
||||
assert(context.prepare_for_round.participant == sn_type::validator);
|
||||
assert(context.prepare_for_round.my_quorum_position < context.wait_for_handshake_bitsets.bitsets.size());
|
||||
assert(context.prepare_for_round.my_quorum_position < quorum.data.size());
|
||||
|
||||
bool timed_out = pulse::clock::now() >= context.wait_for_handshakes.end_time;
|
||||
bool all_handshakes = context.wait_for_handshakes.all_received();
|
||||
|
||||
bool all_handshakes = quorum.count == quorum.data.size();
|
||||
if (all_handshakes || timed_out)
|
||||
{
|
||||
// Add my own bitset to the collected set.
|
||||
context.wait_for_handshake_bitsets.bitsets[context.prepare_for_round.my_quorum_position] = context.wait_for_handshakes.validator_bits;
|
||||
context.wait_for_handshake_bitsets.bitsets_count++;
|
||||
|
||||
// Log collected handshake state
|
||||
{
|
||||
bool missing_handshakes = timed_out && !all_handshakes;
|
||||
std::bitset<8 * sizeof(context.wait_for_handshakes.validator_bits)> bitset = context.wait_for_handshakes.validator_bits;
|
||||
MINFO(log_prefix(context) << "Collected validator handshakes " << bitset << (missing_handshakes ? ", we timed out and some handshakes were not seen! " : ". ") << "Sending handshake bitset and collecting other validator bitsets.");
|
||||
}
|
||||
|
||||
bool missing_handshakes = timed_out && !all_handshakes;
|
||||
std::bitset<8 * sizeof(quorum.bitset)> bitset = quorum.bitset;
|
||||
MINFO(log_prefix(context) << "Collected validator handshakes " << bitset << (missing_handshakes ? ", we timed out and some handshakes were not seen! " : ". ") << "Sending handshake bitset and collecting other validator bitsets.");
|
||||
context.state = round_state::submit_handshake_bitset;
|
||||
return event_loop::keep_running;
|
||||
}
|
||||
|
@ -970,8 +1026,8 @@ event_loop submit_handshake_bitset(round_context &context, void *quorumnet_state
|
|||
assert(context.prepare_for_round.participant == sn_type::validator);
|
||||
try
|
||||
{
|
||||
relay_validator_handshake_bit_or_bitset(context, quorumnet_state, key, true /*sending_bitset*/);
|
||||
context.state = round_state::wait_for_handshake_bitsets;
|
||||
relay_validator_handshake_bit_or_bitset(context, quorumnet_state, key, true /*sending_bitset*/);
|
||||
}
|
||||
catch(std::exception const &e)
|
||||
{
|
||||
|
@ -984,59 +1040,60 @@ event_loop submit_handshake_bitset(round_context &context, void *quorumnet_state
|
|||
|
||||
event_loop wait_for_handshake_bitsets(round_context &context)
|
||||
{
|
||||
size_t const max_bitsets = context.wait_for_handshake_bitsets.bitsets.size();
|
||||
size_t const bitsets_count = context.wait_for_handshake_bitsets.bitsets_count;
|
||||
|
||||
bool all_bitsets = context.wait_for_handshake_bitsets.all_received();
|
||||
bool timed_out = pulse::clock::now() >= context.wait_for_handshake_bitsets.end_time;
|
||||
stage_quorum_data<uint16_t> const &quorum = context.wait_for_handshake_bitsets.quorum;
|
||||
bool const timed_out = pulse::clock::now() >= context.wait_for_handshake_bitsets.end_time;
|
||||
bool const all_bitsets = quorum.count == quorum.data.size();
|
||||
if (timed_out || all_bitsets)
|
||||
{
|
||||
bool missing_bitsets = timed_out && !all_bitsets;
|
||||
MINFO(log_prefix(context)
|
||||
<< "Collected " << bitsets_count << "/" << max_bitsets << " handshake bitsets"
|
||||
<< "Collected " << quorum.count << "/" << quorum.data.size() << " handshake bitsets"
|
||||
<< (missing_bitsets ? ", we timed out and some bitsets were not seen!" : ""));
|
||||
|
||||
std::map<uint16_t, int> most_common_validator_bitset;
|
||||
uint16_t most_common_bitset = 0;
|
||||
int count = 0;
|
||||
for (size_t validator_index = 0; validator_index < max_bitsets; validator_index++)
|
||||
std::map<uint16_t, int> most_common_bitset;
|
||||
uint16_t best_bitset = 0;
|
||||
int count = 0;
|
||||
for (size_t quorum_index = 0; quorum_index < quorum.data.size(); quorum_index++)
|
||||
{
|
||||
uint16_t bits = context.wait_for_handshake_bitsets.bitsets[validator_index];
|
||||
uint16_t num = ++most_common_validator_bitset[bits];
|
||||
if (num > count)
|
||||
auto &[bitset, received] = quorum.data[quorum_index];
|
||||
uint16_t num = ++most_common_bitset[bitset];
|
||||
if (received && num > count)
|
||||
{
|
||||
most_common_bitset = bits;
|
||||
count = num;
|
||||
best_bitset = bitset;
|
||||
count = num;
|
||||
}
|
||||
|
||||
MINFO(log_prefix(context) << "Collected from V[" << validator_index << "], handshake bitset " << std::bitset<8 * sizeof(bits)>(bits));
|
||||
MINFO(log_prefix(context) << "Collected from V[" << quorum_index << "], handshake bitset " << std::bitset<8 * sizeof(bitset)>(bitset));
|
||||
}
|
||||
|
||||
int count_threshold = (service_nodes::PULSE_QUORUM_NUM_VALIDATORS * 6 / 10);
|
||||
if (count < count_threshold || most_common_bitset == 0)
|
||||
int count_threshold = (quorum.data.size() * 6 / 10);
|
||||
if (count < count_threshold || best_bitset == 0)
|
||||
{
|
||||
// Less than 60% of the validators can't come to agreement
|
||||
// about which validators are online, we wait until the
|
||||
// next round.
|
||||
if (most_common_bitset == 0)
|
||||
if (best_bitset == 0)
|
||||
{
|
||||
MINFO(log_prefix(context) << count << "/" << max_bitsets << " validators did not send any handshake bitset or sent an empty handshake bitset");
|
||||
MINFO(log_prefix(context) << count << "/" << quorum.data.size() << " validators did not send any handshake bitset or sent an empty handshake bitset");
|
||||
}
|
||||
else
|
||||
{
|
||||
MINFO(log_prefix(context)
|
||||
<< "We heard back from less than " << count_threshold << " of the validators (" << count << "/"
|
||||
<< max_bitsets << ", waiting for next round.");
|
||||
MINFO(log_prefix(context) << "We heard back from less than " << count_threshold << " of the validators ("
|
||||
<< count << "/" << quorum.data.size() << ", waiting for next round.");
|
||||
}
|
||||
|
||||
return goto_preparing_for_next_round(context);
|
||||
}
|
||||
|
||||
std::bitset<8 * sizeof(most_common_bitset)> bitset = most_common_bitset;
|
||||
context.submit_block_template.validator_bitset = most_common_bitset;
|
||||
std::bitset<8 * sizeof(best_bitset)> bitset = best_bitset;
|
||||
context.submit_block_template.validator_bitset = best_bitset;
|
||||
context.submit_block_template.validator_count = count;
|
||||
|
||||
MINFO(log_prefix(context) << count << "/" << max_bitsets << " validators agreed on the participating nodes in the quorum " << bitset << (context.prepare_for_round.participant == sn_type::producer ? "" : ". Awaiting block template from block producer"));
|
||||
MINFO(log_prefix(context) << count << "/" << quorum.data.size()
|
||||
<< " validators agreed on the participating nodes in the quorum " << bitset
|
||||
<< (context.prepare_for_round.participant == sn_type::producer
|
||||
? ""
|
||||
: ". Awaiting block template from block producer"));
|
||||
|
||||
if (context.prepare_for_round.participant == sn_type::producer)
|
||||
context.state = round_state::submit_block_template;
|
||||
|
@ -1153,27 +1210,39 @@ event_loop submit_random_value_hash(round_context &context, void *quorumnet_stat
|
|||
return event_loop::return_to_caller;
|
||||
}
|
||||
|
||||
event_loop wait_for_random_value_hashes(round_context &context)
|
||||
event_loop wait_for_random_value_hashes(round_context &context, void *quorumnet_state)
|
||||
{
|
||||
bool timed_out = pulse::clock::now() >= context.wait_for_random_value_hashes.end_time;
|
||||
bool all_hashes = context.wait_for_random_value_hashes.count == context.submit_block_template.validator_count;
|
||||
if (context.wait_for_random_value_hashes.queued_messages.count)
|
||||
{
|
||||
for (auto &[msg, queued] : context.wait_for_random_value_hashes.queued_messages.buffer)
|
||||
{
|
||||
if (queued == queueing_state::received)
|
||||
{
|
||||
queued = queueing_state::processed;
|
||||
pulse::handle_message(quorumnet_state, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage_quorum_data<crypto::hash> &quorum = context.wait_for_random_value_hashes.quorum;
|
||||
bool timed_out = pulse::clock::now() >= context.wait_for_random_value_hashes.end_time;
|
||||
bool all_hashes = quorum.count == context.submit_block_template.validator_count;
|
||||
if (timed_out || all_hashes)
|
||||
{
|
||||
uint16_t const validator_bitset = context.wait_for_block_template.block.pulse.validator_bitset;
|
||||
uint16_t const received_bitset = context.wait_for_random_value_hashes.validator_bitset;
|
||||
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(received_bitset);
|
||||
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(quorum.bitset);
|
||||
|
||||
// Invariant Check
|
||||
if (timed_out && !all_hashes)
|
||||
{
|
||||
MDEBUG(log_prefix(context) << "We timed out and there were insufficient hashes, required "
|
||||
<< service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << ", received "
|
||||
<< context.wait_for_random_value_hashes.count << " from " << our_bitset);
|
||||
<< quorum.count << " from " << our_bitset);
|
||||
return goto_preparing_for_next_round(context);
|
||||
}
|
||||
|
||||
bool unexpected_items = (received_bitset | validator_bitset) != validator_bitset;
|
||||
if (context.wait_for_random_value_hashes.count == 0 || unexpected_items)
|
||||
bool unexpected_items = (quorum.bitset | validator_bitset) != validator_bitset;
|
||||
if (quorum.count == 0 || unexpected_items)
|
||||
{
|
||||
auto block_bitset = std::bitset<sizeof(validator_bitset) * 8>(validator_bitset);
|
||||
if (unexpected_items)
|
||||
|
@ -1186,7 +1255,7 @@ event_loop wait_for_random_value_hashes(round_context &context)
|
|||
|
||||
// Accept
|
||||
context.state = round_state::submit_random_value;
|
||||
MINFO(log_prefix(context) << "Received " << context.wait_for_random_value_hashes.count << " random value hashes from " << our_bitset << (timed_out ? ". We timed out and some hashes are missing" : ""));
|
||||
MINFO(log_prefix(context) << "Received " << quorum.count << " random value hashes from " << our_bitset << (timed_out ? ". We timed out and some hashes are missing" : ""));
|
||||
return event_loop::keep_running;
|
||||
}
|
||||
|
||||
|
@ -1213,28 +1282,40 @@ event_loop submit_random_value(round_context &context, void *quorumnet_state, se
|
|||
return event_loop::keep_running;
|
||||
}
|
||||
|
||||
event_loop wait_for_random_value(round_context &context)
|
||||
event_loop wait_for_random_value(round_context &context, void *quorumnet_state)
|
||||
{
|
||||
if (context.wait_for_random_value.queued_messages.count)
|
||||
{
|
||||
for (auto &[msg, queued] : context.wait_for_random_value.queued_messages.buffer)
|
||||
{
|
||||
if (queued == queueing_state::received)
|
||||
{
|
||||
queued = queueing_state::processed;
|
||||
pulse::handle_message(quorumnet_state, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage_quorum_data<cryptonote::pulse_random_value> const &quorum = context.wait_for_random_value.quorum;
|
||||
bool timed_out = pulse::clock::now() >= context.wait_for_random_value.end_time;
|
||||
bool all_values = context.wait_for_random_value.count == context.submit_block_template.validator_count;
|
||||
bool all_values = quorum.count == context.submit_block_template.validator_count;
|
||||
|
||||
if (timed_out || all_values)
|
||||
{
|
||||
uint16_t const validator_bitset = context.wait_for_block_template.block.pulse.validator_bitset;
|
||||
uint16_t const received_bitset = context.wait_for_random_value.validator_bitset;
|
||||
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(received_bitset);
|
||||
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(quorum.bitset);
|
||||
|
||||
// Invariant Check
|
||||
if (timed_out && !all_values)
|
||||
{
|
||||
MDEBUG(log_prefix(context) << "We timed out and there were insufficient random values, required "
|
||||
<< service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << ", received "
|
||||
<< context.wait_for_random_value.count << " from " << our_bitset);
|
||||
<< quorum.count << " from " << our_bitset);
|
||||
return goto_preparing_for_next_round(context);
|
||||
}
|
||||
|
||||
bool unexpected_items = (received_bitset | validator_bitset) != validator_bitset;
|
||||
if (context.wait_for_random_value.count == 0 || unexpected_items)
|
||||
bool unexpected_items = (quorum.bitset | validator_bitset) != validator_bitset;
|
||||
if (quorum.count == 0 || unexpected_items)
|
||||
{
|
||||
auto block_bitset = std::bitset<sizeof(validator_bitset) * 8>(validator_bitset);
|
||||
if (unexpected_items)
|
||||
|
@ -1247,9 +1328,9 @@ event_loop wait_for_random_value(round_context &context)
|
|||
|
||||
// Generate Final Random Value
|
||||
crypto::hash final_hash = {};
|
||||
for (size_t index = 0; index < context.wait_for_random_value.values.size(); index++)
|
||||
for (size_t index = 0; index < quorum.data.size(); index++)
|
||||
{
|
||||
auto &[random_value, received] = context.wait_for_random_value.values[index];
|
||||
auto &[random_value, received] = quorum.data[index];
|
||||
if (received)
|
||||
{
|
||||
MDEBUG(log_prefix(context) << "Final random value seeding with V[" << index << "] " << lokimq::to_hex(tools::view_guts(random_value.data)));
|
||||
|
@ -1290,30 +1371,42 @@ event_loop submit_signed_block(round_context &context, void *quorumnet_state, se
|
|||
return event_loop::keep_running;
|
||||
}
|
||||
|
||||
event_loop wait_for_signed_blocks(round_context &context, cryptonote::core &core)
|
||||
event_loop wait_for_signed_blocks(round_context &context, void *quorumnet_state, cryptonote::core &core)
|
||||
{
|
||||
if (context.wait_for_signed_blocks.queued_messages.count)
|
||||
{
|
||||
for (auto &[msg, queued] : context.wait_for_signed_blocks.queued_messages.buffer)
|
||||
{
|
||||
if (queued == queueing_state::received)
|
||||
{
|
||||
queued = queueing_state::processed;
|
||||
pulse::handle_message(quorumnet_state, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage_quorum_data<crypto::signature> &quorum = context.wait_for_signed_blocks.quorum;
|
||||
bool timed_out = pulse::clock::now() >= context.wait_for_signed_blocks.end_time;
|
||||
bool enough = context.wait_for_signed_blocks.count >= context.submit_block_template.validator_count;
|
||||
bool enough = quorum.count >= context.submit_block_template.validator_count;
|
||||
|
||||
if (timed_out || enough)
|
||||
{
|
||||
|
||||
// TODO(doyle): DRY
|
||||
uint16_t const validator_bitset = context.wait_for_block_template.block.pulse.validator_bitset;
|
||||
uint16_t const received_bitset = context.wait_for_signed_blocks.validator_bitset;
|
||||
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(received_bitset);
|
||||
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(quorum.count);
|
||||
|
||||
// Invariant Check
|
||||
if (timed_out && !enough)
|
||||
{
|
||||
MDEBUG(log_prefix(context) << "We timed out and there were insufficient signatures, required "
|
||||
<< service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << ", received "
|
||||
<< context.wait_for_signed_blocks.count << " from " << our_bitset);
|
||||
<< quorum.count << " from " << our_bitset);
|
||||
return goto_preparing_for_next_round(context);
|
||||
}
|
||||
|
||||
bool unexpected_items = (received_bitset | validator_bitset) != validator_bitset;
|
||||
if (context.wait_for_signed_blocks.count == 0 || unexpected_items)
|
||||
bool unexpected_items = (quorum.bitset | validator_bitset) != validator_bitset;
|
||||
if (quorum.count == 0 || unexpected_items)
|
||||
{
|
||||
auto block_bitset = std::bitset<sizeof(validator_bitset) * 8>(validator_bitset);
|
||||
if (unexpected_items)
|
||||
|
@ -1336,8 +1429,8 @@ event_loop wait_for_signed_blocks(round_context &context, cryptonote::core &core
|
|||
cryptonote::block &final_block = context.wait_for_block_template.block;
|
||||
for (size_t index = 0; index < service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES; index++)
|
||||
{
|
||||
uint16_t validator_index = indices[index];
|
||||
auto &[signature, received] = context.wait_for_signed_blocks.data[validator_index];
|
||||
uint16_t validator_index = indices[index];
|
||||
auto const &[signature, received] = quorum.data[validator_index];
|
||||
assert(received);
|
||||
final_block.signatures.emplace_back(validator_index, signature);
|
||||
}
|
||||
|
@ -1423,7 +1516,7 @@ void pulse::main(void *quorumnet_state, cryptonote::core &core)
|
|||
break;
|
||||
|
||||
case round_state::wait_for_random_value_hashes:
|
||||
loop = wait_for_random_value_hashes(context);
|
||||
loop = wait_for_random_value_hashes(context, quorumnet_state);
|
||||
break;
|
||||
|
||||
case round_state::submit_random_value:
|
||||
|
@ -1431,7 +1524,7 @@ void pulse::main(void *quorumnet_state, cryptonote::core &core)
|
|||
break;
|
||||
|
||||
case round_state::wait_for_random_value:
|
||||
loop = wait_for_random_value(context);
|
||||
loop = wait_for_random_value(context, quorumnet_state);
|
||||
break;
|
||||
|
||||
case round_state::submit_signed_block:
|
||||
|
@ -1439,7 +1532,7 @@ void pulse::main(void *quorumnet_state, cryptonote::core &core)
|
|||
break;
|
||||
|
||||
case round_state::wait_for_signed_blocks:
|
||||
loop = wait_for_signed_blocks(context, core);
|
||||
loop = wait_for_signed_blocks(context, quorumnet_state, core);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,15 @@ namespace service_nodes {
|
|||
|
||||
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 0;
|
||||
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 0;
|
||||
#else
|
||||
#if 1
|
||||
constexpr auto PULSE_ROUND_TIME = 20s;
|
||||
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 3s;
|
||||
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 3s;
|
||||
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 3s;
|
||||
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION = 3s;
|
||||
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_DURATION = 3s;
|
||||
constexpr auto PULSE_WAIT_FOR_SIGNED_BLOCK_DURATION = 5s;
|
||||
#else
|
||||
constexpr auto PULSE_ROUND_TIME = 60s;
|
||||
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 10s;
|
||||
|
@ -26,6 +35,7 @@ namespace service_nodes {
|
|||
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION = 10s;
|
||||
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_DURATION = 10s;
|
||||
constexpr auto PULSE_WAIT_FOR_SIGNED_BLOCK_DURATION = 10s;
|
||||
#endif
|
||||
|
||||
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 7;
|
||||
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 7; // A block must have exactly N signatures to be considered properly
|
||||
|
|
Loading…
Reference in New Issue