Pulse: Add signed block propagation

This commit is contained in:
Doyle 2020-08-12 18:22:23 +10:00
parent b5854a11b0
commit c264563f91
4 changed files with 223 additions and 23 deletions

View file

@ -3,6 +3,7 @@
#include <chrono>
#include "misc_log_ex.h"
#include "common/random.h"
#include "cryptonote_core.h"
#include "cryptonote_basic/hardfork.h"
@ -34,6 +35,9 @@ enum struct round_state
submit_random_value,
wait_for_random_value,
submit_signed_block,
wait_for_signed_blocks,
};
constexpr std::string_view round_state_string(round_state state)
@ -59,6 +63,9 @@ constexpr std::string_view round_state_string(round_state state)
case round_state::submit_random_value: return "Submit Random Value"sv;
case round_state::wait_for_random_value: return "Wait For Random Value"sv;
case round_state::submit_signed_block: return "Submit Signed Block"sv;
case round_state::wait_for_signed_blocks: return "Wait For Signed Blocks"sv;
}
return "Invalid2"sv;
@ -141,6 +148,20 @@ struct round_context
pulse::time_point end_time;
} wait_for_random_value;
struct
{
std::string blob;
} submit_signed_block;
struct
{
// TODO(doyle): DRY
std::array<std::pair<crypto::signature, bool>, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> data;
uint16_t validator_bitset;
uint16_t count;
pulse::time_point end_time;
} wait_for_signed_blocks;
round_state state;
};
@ -182,7 +203,7 @@ bool msg_time_check(round_context const &context, pulse::message const &msg, pul
return true;
}
crypto::hash make_message_signature_hash(round_context const &context, pulse::message const &msg)
crypto::hash message_signature_hash(round_context const &context, pulse::message const &msg)
{
assert(context.state >= round_state::wait_for_next_block);
crypto::hash result = {};
@ -223,6 +244,12 @@ crypto::hash make_message_signature_hash(round_context const &context, pulse::me
result = crypto::cn_fast_hash(buf.data(), buf.size());
}
break;
case pulse::message_type::signed_block:
{
result = crypto::cn_fast_hash(context.submit_signed_block.blob.data(), context.submit_signed_block.blob.size());
}
break;
}
return result;
@ -258,7 +285,8 @@ bool message_signature_check(pulse::message const &msg, service_nodes::quorum co
case pulse::message_type::handshake: /* FALLTHRU */
case pulse::message_type::handshake_bitset: /* FALLTHRU */
case pulse::message_type::random_value_hash: /* FALLTHRU */
case pulse::message_type::random_value:
case pulse::message_type::random_value: /* FALLTHRU */
case pulse::message_type::signed_block:
{
if (msg.quorum_position >= static_cast<int>(quorum.validators.size()))
{
@ -283,8 +311,7 @@ bool message_signature_check(pulse::message const &msg, service_nodes::quorum co
break;
}
// Verify
if (!crypto::check_signature(make_message_signature_hash(context, msg), *key, msg.signature))
if (!crypto::check_signature(message_signature_hash(context, msg), *key, msg.signature))
{
MERROR(log_prefix(context) << "Signature for " << message_source_string(context, msg) << " at height " << context.wait_for_next_block.height << "; is invalid");
return false;
@ -309,7 +336,7 @@ void relay_validator_handshake_bit_or_bitset(round_context const &context, void
msg.type = pulse::message_type::handshake;
}
crypto::generate_signature(make_message_signature_hash(context, msg), key.pub, key.key, msg.signature);
crypto::generate_signature(message_signature_hash(context, msg), key.pub, key.key, msg.signature);
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.prepare_for_round.quorum, false /*block_producer*/);
}
@ -455,6 +482,36 @@ void pulse::handle_message(void *quorumnet_state, pulse::message const &msg)
relay_message = true;
}
break;
case pulse::message_type::signed_block:
{
if (!msg_time_check(context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_signed_blocks.end_time))
return;
// TODO(doyle): Its possible to receive the signed block before we've produced
// the final random value. We need to delay verification to after we're
// ready for it. This is the case for all the stages that rely on
// previous stages data being ready.
uint16_t validator_bit = 1 << msg.quorum_position;
if ((validator_bit & context.wait_for_block_template.block.pulse.validator_bitset) == 0)
{
MINFO(log_prefix(context) << "Dropping " << message_source_string(context, msg) << ". Not a locked in participant.");
return;
}
// Signature already verified in message_signature_check(...)
auto &[signature, received] = context.wait_for_signed_blocks.data[msg.quorum_position];
if (received)
return;
context.wait_for_signed_blocks.validator_bitset |= validator_bit;
context.wait_for_signed_blocks.count++;
signature = msg.signature;
received = true;
relay_message = true;
}
break;
}
if (relay_message && quorumnet_state)
@ -753,6 +810,7 @@ event_loop prepare_for_round(round_context &context, service_nodes::service_node
context.wait_for_block_template.end_time = context.wait_for_handshake_bitsets.end_time + service_nodes::PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION;
context.wait_for_random_value_hashes.end_time = context.wait_for_block_template.end_time + service_nodes::PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION;
context.wait_for_random_value.end_time = context.wait_for_random_value_hashes.end_time + service_nodes::PULSE_WAIT_FOR_RANDOM_VALUE_DURATION;
context.wait_for_signed_blocks.end_time = context.wait_for_random_value.end_time + service_nodes::PULSE_WAIT_FOR_SIGNED_BLOCK_DURATION;
context.prepare_for_round.quorum =
service_nodes::generate_pulse_quorum(blockchain.nettype(),
@ -1003,7 +1061,7 @@ event_loop submit_block_template(round_context &context, service_nodes::service_
pulse::message msg = {};
msg.type = pulse::message_type::block_template;
msg.block_template.blob = cryptonote::t_serializable_object_to_blob(block);
crypto::generate_signature(make_message_signature_hash(context, msg), key.pub, key.key, msg.signature);
crypto::generate_signature(message_signature_hash(context, msg), key.pub, key.key, msg.signature);
// Send
MINFO(log_prefix(context) << "Validators are handshaken and ready, sending block template from producer (us) to validators.\n" << cryptonote::obj_to_json_str(block));
@ -1060,7 +1118,7 @@ event_loop submit_random_value_hash(round_context &context, void *quorumnet_stat
msg.type = pulse::message_type::random_value_hash;
msg.quorum_position = context.prepare_for_round.my_quorum_position;
msg.random_value_hash.hash = crypto::cn_fast_hash(context.submit_random_value_hash.value.data, sizeof(context.submit_random_value_hash.value.data));
crypto::generate_signature(make_message_signature_hash(context, msg), key.pub, key.key, msg.signature);
crypto::generate_signature(message_signature_hash(context, msg), key.pub, key.key, msg.signature);
// Add Ourselves
handle_message(nullptr /*quorumnet_state*/, msg);
@ -1120,15 +1178,15 @@ event_loop submit_random_value(round_context &context, void *quorumnet_state, se
msg.type = pulse::message_type::random_value;
msg.quorum_position = context.prepare_for_round.my_quorum_position;
msg.random_value.value = context.submit_random_value_hash.value;
crypto::generate_signature(make_message_signature_hash(context, msg), key.pub, key.key, msg.signature);
crypto::generate_signature(message_signature_hash(context, msg), key.pub, key.key, msg.signature);
// Add Ourselves
handle_message(nullptr /*quorumnet_state*/, msg);
// Send
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.prepare_for_round.quorum, false /*block_producer*/);
context.state = round_state::wait_for_random_value;
return event_loop::return_to_caller;
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.prepare_for_round.quorum, false /*block_producer*/);
return event_loop::keep_running;
}
event_loop wait_for_random_value(round_context &context)
@ -1176,15 +1234,100 @@ event_loop wait_for_random_value(round_context &context)
}
}
cryptonote::pulse_random_value &final_random_value = context.wait_for_block_template.block.pulse.random_value;
cryptonote::block &block = context.wait_for_block_template.block;
cryptonote::pulse_random_value &final_random_value = block.pulse.random_value;
std::memcpy(final_random_value.data, final_hash.data, sizeof(final_random_value.data));
MINFO(log_prefix(context) << "Block final random value " << lokimq::to_hex(tools::view_guts(final_random_value.data)) << " generated from validators " << our_bitset);
context.submit_signed_block.blob = cryptonote::t_serializable_object_to_blob(block);
context.state = round_state::submit_signed_block;
return event_loop::keep_running;
}
return event_loop::return_to_caller;
}
event_loop submit_signed_block(round_context &context, void *quorumnet_state, service_nodes::service_node_keys const &key)
{
assert(context.prepare_for_round.participant == sn_type::validator);
// Message
pulse::message msg = {};
msg.type = pulse::message_type::signed_block;
msg.quorum_position = context.prepare_for_round.my_quorum_position;
crypto::generate_signature(message_signature_hash(context, msg), key.pub, key.key, msg.signature);
// Add Ourselves
handle_message(nullptr /*quorumnet_state*/, msg);
// Send
context.state = round_state::wait_for_signed_blocks;
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.prepare_for_round.quorum, false /*block_producer*/);
return event_loop::keep_running;
}
event_loop wait_for_signed_blocks(round_context &context, cryptonote::core &core)
{
bool timed_out = pulse::clock::now() >= context.wait_for_signed_blocks.end_time;
bool enough = context.wait_for_signed_blocks.count >= context.submit_block_template.validator_count;
if (timed_out || enough)
{
// TODO(doyle): DRY
uint16_t const validator_bitset = context.wait_for_block_template.block.pulse.validator_bitset;
uint16_t const received_bitset = context.wait_for_signed_blocks.validator_bitset;
auto const our_bitset = std::bitset<sizeof(validator_bitset) * 8>(received_bitset);
// Invariant Check
if (timed_out && !enough)
{
MDEBUG(log_prefix(context) << "We timed out and there were insufficient signatures, required "
<< service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES << ", received "
<< context.wait_for_signed_blocks.count << " from " << our_bitset);
return goto_preparing_for_next_round(context);
}
bool unexpected_items = (received_bitset | validator_bitset) != validator_bitset;
if (context.wait_for_signed_blocks.count == 0 || unexpected_items)
{
auto block_bitset = std::bitset<sizeof(validator_bitset) * 8>(validator_bitset);
if (unexpected_items)
MERROR(log_prefix(context) << "Internal error, unexpected block validator bitset is " << block_bitset << ", our bitset was " << our_bitset);
else
MERROR(log_prefix(context) << "Internal error, unexpected empty bitset received, we expected " << block_bitset);
return goto_preparing_for_next_round(context);
}
// Select signatures randomly so we don't always just take the first N required signatures.
// Then sort just the first N required signatures, so signatures are added
// to the block in sorted order, but were chosen randomly.
std::array<size_t, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> indices = {};
std::iota(indices.begin(), indices.end(), 0);
tools::shuffle_portable(indices.begin(), indices.end(), tools::rng);
std::sort(indices.begin(), indices.begin() + service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES);
// Add Signatures
cryptonote::block &final_block = context.wait_for_block_template.block;
for (size_t index = 0; index < service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES; index++)
{
uint16_t validator_index = indices[index];
auto &[signature, received] = context.wait_for_signed_blocks.data[validator_index];
assert(received);
final_block.signatures.emplace_back(validator_index, signature);
}
// Propagate Final Block
MINFO(log_prefix(context) << "Final signed block received\n" << cryptonote::obj_to_json_str(final_block));
cryptonote::block_verification_context bvc = {};
core.handle_block_found(final_block, bvc);
context.state = round_state::wait_for_next_block;
return event_loop::keep_running;
}
return event_loop::keep_running;
return event_loop::return_to_caller;
}
void pulse::main(void *quorumnet_state, cryptonote::core &core)
@ -1266,6 +1409,14 @@ void pulse::main(void *quorumnet_state, cryptonote::core &core)
case round_state::wait_for_random_value:
loop = wait_for_random_value(context);
break;
case round_state::submit_signed_block:
loop = submit_signed_block(context, quorumnet_state, key);
break;
case round_state::wait_for_signed_blocks:
loop = wait_for_signed_blocks(context, core);
break;
}
}
}

View file

@ -34,6 +34,7 @@ enum struct message_type : uint8_t
block_template,
random_value_hash,
random_value,
signed_block,
};
constexpr std::string_view message_type_string(message_type type)
@ -46,6 +47,7 @@ constexpr std::string_view message_type_string(message_type type)
case message_type::block_template: return "Block Template"sv;
case message_type::random_value_hash: return "Random Value Hash"sv;
case message_type::random_value: return "Random Value"sv;
case message_type::signed_block: return "Signed Block"sv;
}
return "Invalid2"sv;
}

View file

@ -9,21 +9,23 @@ namespace service_nodes {
constexpr size_t PULSE_QUORUM_ENTROPY_LAG = 21; // How many blocks back from the tip of the Blockchain to source entropy for the Pulse quorums.
#if defined(LOKI_ENABLE_INTEGRATION_TEST_HOOKS)
constexpr auto PULSE_ROUND_TIME = 20s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 4s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 4s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 4s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION = 4s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_DURATION = 4s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 3s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 3s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 3s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION = 3s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_DURATION = 3s;
constexpr auto PULSE_WAIT_FOR_SIGNED_BLOCK_DURATION = 5s;
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 0;
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 0;
#else
constexpr auto PULSE_ROUND_TIME = 60s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 12s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 12s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 12s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION = 12s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_DURATION = 12s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 10s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 10s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 10s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION = 10s;
constexpr auto PULSE_WAIT_FOR_RANDOM_VALUE_DURATION = 10s;
constexpr auto PULSE_WAIT_FOR_SIGNED_BLOCK_DURATION = 10s;
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 7;
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 7; // A block must have exactly N signatures to be considered properly
@ -38,7 +40,8 @@ namespace service_nodes {
PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION +
PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION +
PULSE_WAIT_FOR_RANDOM_VALUE_HASH_DURATION +
PULSE_WAIT_FOR_RANDOM_VALUE_DURATION);
PULSE_WAIT_FOR_RANDOM_VALUE_DURATION +
PULSE_WAIT_FOR_SIGNED_BLOCK_DURATION);
static_assert(PULSE_QUORUM_NUM_VALIDATORS >= PULSE_BLOCK_REQUIRED_SIGNATURES);
static_assert(PULSE_QUORUM_ENTROPY_LAG >= PULSE_QUORUM_SIZE, "We need to pull atleast PULSE_QUORUM_SIZE number of blocks from the Blockchain, we can't if the amount of blocks to go back from the tip of the Blockchain is less than the blocks we need.");

View file

@ -1434,11 +1434,13 @@ const std::string PULSE_CMD_VALIDATOR_BIT = "validator_bit";
const std::string PULSE_CMD_BLOCK_TEMPLATE = "block_template";
const std::string PULSE_CMD_RANDOM_VALUE_HASH = "random_value_hash";
const std::string PULSE_CMD_RANDOM_VALUE = "random_value";
const std::string PULSE_CMD_SIGNED_BLOCK = "signed_block";
const std::string PULSE_CMD_SEND_VALIDATOR_BITSET = PULSE_CMD_CATEGORY + "." + PULSE_CMD_VALIDATOR_BITSET;
const std::string PULSE_CMD_SEND_VALIDATOR_BIT = PULSE_CMD_CATEGORY + "." + PULSE_CMD_VALIDATOR_BIT;
const std::string PULSE_CMD_SEND_BLOCK_TEMPLATE = PULSE_CMD_CATEGORY + "." + PULSE_CMD_BLOCK_TEMPLATE;
const std::string PULSE_CMD_SEND_RANDOM_VALUE_HASH = PULSE_CMD_CATEGORY + "." + PULSE_CMD_RANDOM_VALUE_HASH;
const std::string PULSE_CMD_SEND_RANDOM_VALUE = PULSE_CMD_CATEGORY + "." + PULSE_CMD_RANDOM_VALUE;
const std::string PULSE_CMD_SEND_SIGNED_BLOCK = PULSE_CMD_CATEGORY + "." + PULSE_CMD_SIGNED_BLOCK;
void pulse_relay_message_to_quorum(void *self, pulse::message const &msg, service_nodes::quorum const &quorum, bool block_producer)
{
@ -1503,6 +1505,14 @@ void pulse_relay_message_to_quorum(void *self, pulse::message const &msg, servic
{PULSE_TAG_RANDOM_VALUE, tools::view_guts(msg.random_value.value)}};
}
break;
case pulse::message_type::signed_block:
{
command = PULSE_CMD_SEND_SIGNED_BLOCK;
data = {{PULSE_TAG_QUORUM_POSITION, msg.quorum_position},
{PULSE_TAG_SIGNATURE, tools::view_guts(msg.signature)}};
}
break;
}
QnetState &qnet = *static_cast<QnetState *>(self);
@ -1709,6 +1719,39 @@ void handle_pulse_random_value(Message &m, QnetState &qnet)
qnet.lmq.job([self, data = std::move(msg)]() { pulse::handle_message(self, data); }, qnet.core.pulse_thread_id());
}
void handle_pulse_signed_block(Message &m, QnetState &qnet)
{
if (m.data.size() != 1)
throw std::runtime_error(std::string("Rejecting pulse signed block expected one data entry not ") + std::to_string(m.data.size()));
bt_dict_consumer data{m.data[0]};
int quorum_position = -1;
crypto::signature signature = {};
{
// TODO(doyle): DRY
std::string_view INVALID_ARG_PREFIX = "Invalid pulse signed block: missing required field '"sv;
if (auto const &tag = PULSE_TAG_QUORUM_POSITION; data.skip_until(tag))
quorum_position = data.consume_integer<int>();
else
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(tag) + "'");
if (auto const &tag = PULSE_TAG_SIGNATURE; data.skip_until(tag)) {
auto sig_str = data.consume_string_view();
signature = convert_string_view_bytes_to_signature(sig_str);
} else {
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(tag) + "'");
}
}
pulse::message msg = {};
msg.type = pulse::message_type::signed_block;
msg.quorum_position = quorum_position;
msg.signature = signature;
auto *self = reinterpret_cast<void *>(&qnet);
qnet.lmq.job([self, data = std::move(msg)]() { pulse::handle_message(self, data); }, qnet.core.pulse_thread_id());
}
} // end empty namespace
@ -1765,6 +1808,7 @@ void setup_endpoints(QnetState& qnet) {
.add_command(PULSE_CMD_BLOCK_TEMPLATE, [&qnet](Message& m) { handle_pulse_block_template(m, qnet); })
.add_command(PULSE_CMD_RANDOM_VALUE_HASH, [&qnet](Message& m) { handle_pulse_random_value_hash(m, qnet); })
.add_command(PULSE_CMD_RANDOM_VALUE, [&qnet](Message& m) { handle_pulse_random_value(m, qnet); })
.add_command(PULSE_CMD_SIGNED_BLOCK, [&qnet](Message& m) { handle_pulse_signed_block(m, qnet); })
;
// Compatibility aliases. No longer used since 7.1.4, but can still be received from previous