Pulse: Add relay signed pulse block template pulse round

This commit is contained in:
Doyle 2020-08-05 14:31:17 +10:00
parent bd13791082
commit ba5f955f87
7 changed files with 421 additions and 174 deletions

View File

@ -278,10 +278,11 @@ namespace cryptonote
quorumnet_send_pulse_validator_handshake_bit_proc *quorumnet_send_pulse_validator_handshake_bit = [](void *, service_nodes::quorum const &, crypto::hash const &) -> void { need_core_init(); };
quorumnet_send_pulse_validator_handshake_bitset_proc *quorumnet_send_pulse_validator_handshake_bitset = [](void *, service_nodes::quorum const &, crypto::hash const &, uint16_t) -> void { need_core_init(); };
quorumnet_send_pulse_block_template_proc *quorumnet_send_pulse_block_template = [](void *, std::string &&blob, crypto::signature const &, service_nodes::quorum const &) -> void { need_core_init(); };
quorumnet_pulse_pump_messages_proc *quorumnet_pulse_pump_messages = [](void*, pulse::message &, pulse::time_point) -> bool { need_core_init(); return false; };
quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum = [](void *, pulse::message const &, service_nodes::quorum const &) -> void { need_core_init(); };
quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum = [](void *, pulse::message const &, service_nodes::quorum const &, bool) -> void { need_core_init(); };
//-----------------------------------------------------------------------------------------------
core::core()

View File

@ -105,13 +105,14 @@ namespace cryptonote
// When sending the validator handshake bitset, the message is also sent to the block producer.
using quorumnet_send_pulse_validator_handshake_bit_proc = void (void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash);
using quorumnet_send_pulse_validator_handshake_bitset_proc = void (void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash, uint16_t handshake_bitset);
using quorumnet_send_pulse_block_template_proc = void (void *, std::string &&blob, crypto::signature const &, service_nodes::quorum const &);
// Blocking call that waits on a message from the Pulse message queue which are messages for Pulse received via QuorumNet until the end time has passed.
// end_time: Set a clock time to which the call can return if there are no more messages queued up.
// msg: When function returns true, the pumped message is written to msg.
// return: False if there is no message and the clock has passed the end_time, true if there's a message.
using quorumnet_pulse_pump_messages_proc = bool (void *self, pulse::message &msg, pulse::time_point end_time);
// Relay a Pulse message to members specified in the quorum excluding the originating message owner.
using quorumnet_pulse_relay_message_to_quorum_proc = void (void *, pulse::message const &msg, service_nodes::quorum const &quorum);
using quorumnet_pulse_relay_message_to_quorum_proc = void (void *, pulse::message const &msg, service_nodes::quorum const &quorum, bool block_producer);
extern quorumnet_new_proc *quorumnet_new;
extern quorumnet_delete_proc *quorumnet_delete;
@ -120,6 +121,7 @@ namespace cryptonote
extern quorumnet_send_pulse_validator_handshake_bit_proc *quorumnet_send_pulse_validator_handshake_bit;
extern quorumnet_send_pulse_validator_handshake_bitset_proc *quorumnet_send_pulse_validator_handshake_bitset;
extern quorumnet_send_pulse_block_template_proc *quorumnet_send_pulse_block_template;
extern quorumnet_pulse_pump_messages_proc *quorumnet_pulse_pump_messages;
extern quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum;

View File

@ -1072,7 +1072,7 @@ namespace cryptonote
const blobdata bd = get_block_hashing_blob(b);
const uint8_t hf_version = b.major_version;
#if defined(LOKI_ENABLE_INTEGRATION_TEST_HOOKS)
#if defined(LOKI_INTEGRATION_TESTS)
miners = 0;
#endif

View File

@ -18,9 +18,28 @@ enum struct round_state
wait_for_handshakes,
wait_for_other_validator_handshake_bitsets,
submit_block_template,
wait_for_block_template,
terminate,
};
constexpr std::string_view round_state_string(round_state state)
{
using namespace std::literals;
switch(state)
{
default: assert("Invalid Code Path" == nullptr);
case round_state::wait_next_block: return "Wait Next Block"sv;
case round_state::wait_for_round: return "Wait For Round"sv;
case round_state::round_starts: return "Round Starts"sv;
case round_state::wait_for_handshakes: return "Wait For Handshakes"sv;
case round_state::wait_for_other_validator_handshake_bitsets: return "Wait For Validator Handshake Bitsets"sv;
case round_state::submit_block_template: return "Submit Block Template"sv;
case round_state::wait_for_block_template: return "Wait For Block Template"sv;
case round_state::terminate: return "Terminate"sv;
}
}
struct round_context
{
struct
@ -61,6 +80,13 @@ struct round_context
{
uint16_t validator_bitset;
} submit_block_template;
struct
{
bool received;
cryptonote::block block;
pulse::time_point end_time;
} wait_for_block_template;
};
namespace
@ -148,7 +174,7 @@ bool msg_time_check(uint64_t height, round_context const &context, pulse::messag
if (now < start || now >= end)
{
std::stringstream stream;
stream << log_prefix(height, context) << "Dropping " << pulse::message_type_string(msg.type) << " message from validator " << msg.quorum_position << ", message arrived ";
stream << log_prefix(height, context) << "Dropping " << pulse::message_type_string(msg.type) << " message from validator " << msg.handshakes.quorum_position << ", message arrived ";
if (now < start)
{
@ -404,8 +430,9 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
//
pulse::time_point const start_time = context.wait_next_block.round_0_start_time +
(context.wait_next_block.round * service_nodes::PULSE_TIME_PER_BLOCK);
context.wait_for_handshakes.end_time = start_time + service_nodes::PULSE_WAIT_FOR_HANDSHAKES_DURATION;
context.wait_for_other_validator_handshake_bitsets.end_time = context.wait_for_handshakes.end_time + service_nodes::PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION;
context.wait_for_handshakes.end_time = start_time + service_nodes::PULSE_WAIT_FOR_HANDSHAKES_DURATION;
context.wait_for_other_validator_handshake_bitsets.end_time = context.wait_for_handshakes.end_time + service_nodes::PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION;
context.wait_for_block_template.end_time = context.wait_for_other_validator_handshake_bitsets.end_time + service_nodes::PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION;
//
// NOTE: Quorum
@ -438,18 +465,16 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
//
// NOTE: Quorum participation
//
bool validator = false;
if (key.pub == context.wait_for_round.quorum.workers[0])
{
// NOTE: Producer doesn't send handshakes, they only collect the
// handshake bitsets from the other validators to determine who to
// lock in for this round in the block template.
MGINFO(log_prefix(pulse_height, context) << "We are the block producer for height " << pulse_height << " in round " << +context.wait_next_block.round << ", awaiting validator handshake bitsets.");
context.round_starts.is_producer = true;
round_state = round_state::wait_for_other_validator_handshake_bitsets;
}
else
{
bool validator = false;
for (size_t index = 0; index < context.wait_for_round.quorum.validators.size(); index++)
{
auto const &validator_key = context.wait_for_round.quorum.validators[index];
@ -460,15 +485,13 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
break;
}
}
}
if (validator || context.round_starts.is_producer)
{
context.round_starts.node_name =
context.round_starts.is_producer ? "W[0]" : "V[" + std::to_string(context.round_starts.my_quorum_position) + "]";
}
if (validator || context.round_starts.is_producer)
{
if (validator)
{
context.round_starts.node_name = "V[" + std::to_string(context.round_starts.my_quorum_position) + "]";
try
{
context.wait_for_handshakes.validator_bits |= (1 << context.round_starts.my_quorum_position); // Add myself
@ -485,10 +508,16 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
}
else
{
MGINFO(log_prefix(pulse_height, context) << "We are not a pulse validator. Waiting for next pulse round or block.");
round_state = thread_sleep(sleep_until::next_block_or_round, state, blockchain, context, pulse_mutex, pulse_height);
context.round_starts.node_name = "W[0]";
MGINFO(log_prefix(pulse_height, context) << "We are the block producer for height " << pulse_height << " in round " << +context.wait_next_block.round << ", awaiting validator handshake bitsets.");
round_state = round_state::wait_for_other_validator_handshake_bitsets;
}
}
else
{
MGINFO(log_prefix(pulse_height, context) << "We are not a pulse validator. Waiting for next pulse round or block.");
round_state = thread_sleep(sleep_until::next_block_or_round, state, blockchain, context, pulse_mutex, pulse_height);
}
}
break;
@ -496,6 +525,8 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
case round_state::wait_for_handshakes:
{
assert(!context.round_starts.is_producer);
assert(context.round_starts.my_quorum_position < context.wait_for_other_validator_handshake_bitsets.received_bitsets.size());
bool timed_out = pulse::clock::now() >= context.wait_for_handshakes.end_time;
bool all_handshakes = context.wait_for_handshakes.all_received();
@ -503,7 +534,9 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
{
assert(context.round_starts.my_quorum_position < context.wait_for_other_validator_handshake_bitsets.received_bitsets.size());
std::bitset<8 * sizeof(context.wait_for_handshakes.validator_bits)> bitset = context.wait_for_handshakes.validator_bits;
context.wait_for_other_validator_handshake_bitsets.received_bitsets[context.round_starts.my_quorum_position] = context.wait_for_handshakes.validator_bits;
context.wait_for_other_validator_handshake_bitsets.received_bitsets_count++;
bool missing_handshakes = timed_out && !all_handshakes;
MGINFO(log_prefix(pulse_height, context) << "Collected validator handshakes " << bitset << (missing_handshakes ? ", we timed out and some handshakes were not seen! " : ". ") << "Sending handshake bitset and collecting other validator bitsets.");
@ -557,9 +590,17 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
// Less than 60% of the validators can't come to agreement
// about which validators are online, we wait until the
// next round.
MGINFO(log_prefix(pulse_height, context)
<< "We heard back from less than " << count_threshold << " of the validators (" << count << "/"
<< max_bitsets << ", waiting for next round.");
if (most_common_bitset == 0)
{
MGINFO(log_prefix(pulse_height, context) << count << "/" << max_bitsets << " validators did not send any handshake bitset or sent an empty handshake bitset");
}
else
{
MGINFO(log_prefix(pulse_height, context)
<< "We heard back from less than " << count_threshold << " of the validators (" << count << "/"
<< max_bitsets << ", waiting for next round.");
}
round_state = thread_sleep(sleep_until::next_block_or_round, state, blockchain, context, pulse_mutex, pulse_height);
}
else
@ -567,12 +608,11 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
std::bitset<8 * sizeof(most_common_bitset)> bitset = most_common_bitset;
MGINFO(log_prefix(pulse_height, context) << count << "/" << max_bitsets << " validators agreed on the participating nodes in the quorum " << bitset << (context.round_starts.is_producer ? "" : ". Awaiting block template from block producer"));
context.submit_block_template.validator_bitset = most_common_bitset;
if (context.round_starts.is_producer)
round_state = context.round_starts.is_producer ? round_state::submit_block_template : round_state::wait_next_block;
round_state = round_state::submit_block_template;
else
{
round_state = thread_sleep(sleep_until::next_block_or_round, state, blockchain, context, pulse_mutex, pulse_height);
}
round_state = round_state::wait_for_block_template;
}
}
}
@ -601,6 +641,7 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
MGINFO(log_prefix(pulse_height, context) << "Validators are handshaken and ready, sending block template from producer (us) to validators.\n");
service_nodes::payout block_producer_payouts = service_nodes::service_node_info_to_payout(key.pub, *info);
cryptonote::block block = {};
uint64_t expected_reward = 0;
blockchain.create_next_pulse_block_template(block, block_producer_payouts, pulse_height, expected_reward);
@ -608,10 +649,29 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
block.pulse.round = context.wait_next_block.round;
block.pulse.validator_participation_bits = context.submit_block_template.validator_bitset;
cryptonote::block_verification_context bvc = {};
core.handle_block_found(block, bvc);
std::string block_blob = cryptonote::t_serializable_object_to_blob(block);
crypto::hash hash = crypto::cn_fast_hash(block_blob.data(), block_blob.size());
round_state = round_state::wait_next_block;
crypto::signature block_signature = {};
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, block_signature);
cryptonote::quorumnet_send_pulse_block_template(quorumnet_state, std::move(block_blob), block_signature, context.wait_for_round.quorum);
round_state = thread_sleep(sleep_until::next_block, state, blockchain, context, pulse_mutex, pulse_height);
}
break;
case round_state::wait_for_block_template:
{
bool timed_out = pulse::clock::now() >= context.wait_for_block_template.end_time;
if (timed_out || context.wait_for_block_template.received)
{
if (context.wait_for_block_template.received)
MGINFO(log_prefix(pulse_height, context) << "Valid block received: " << cryptonote::obj_to_json_str(context.wait_for_block_template.block));
else
MGINFO(log_prefix(pulse_height, context) << "Block template not received");
round_state = thread_sleep(sleep_until::next_block, state, blockchain, context, pulse_mutex, pulse_height);
}
}
break;
}
@ -628,12 +688,16 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
case round_state::wait_for_other_validator_handshake_bitsets:
pump_messages_until = context.wait_for_other_validator_handshake_bitsets.end_time;
break;
case round_state::wait_for_block_template:
pump_messages_until = context.wait_for_block_template.end_time;
break;
}
if (auto now = pulse::clock::now(); now < pump_messages_until)
{
auto duration = std::chrono::duration_cast<std::chrono::seconds>(pump_messages_until - now);
MGINFO(log_prefix(pulse_height, context) << "Pumping messages from quorumnet for " << duration.count() << "s or until all messages received.");
MGINFO(log_prefix(pulse_height, context) << "Pumping messages for '" << round_state_string(round_state) << "' from quorumnet for " << duration.count() << "s or until all messages received.");
}
pulse::message msg = {};
@ -642,80 +706,128 @@ void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &c
bool relay_message = false;
bool finish_pumping = false;
if (msg.quorum_position >= static_cast<int>(context.wait_for_round.quorum.validators.size()))
if (msg.type == pulse::message_type::handshake || msg.type == pulse::message_type::handshake_bitset)
{
MERROR(log_prefix(pulse_height, context) << "Quorum position " << msg.quorum_position << " in Pulse message indexes oob");
continue;
if (msg.handshakes.quorum_position >= static_cast<int>(context.wait_for_round.quorum.validators.size()))
{
MERROR(log_prefix(pulse_height, context) << "Quorum position " << msg.handshakes.quorum_position << " in Pulse message indexes oob");
continue;
}
crypto::public_key const &validator_key = context.wait_for_round.quorum.validators[msg.handshakes.quorum_position];
crypto::hash hash = {};
if (msg.type == pulse::message_type::handshake)
{
auto buf = tools::memcpy_le(top_hash.data, msg.handshakes.quorum_position);
hash = crypto::cn_fast_hash(buf.data(), buf.size());
}
else
{
auto buf = tools::memcpy_le(msg.handshakes.validator_bitset, top_hash.data, msg.handshakes.quorum_position);
hash = crypto::cn_fast_hash(buf.data(), buf.size());
}
if (!crypto::check_signature(hash, validator_key, msg.handshakes.signature))
{
MERROR(log_prefix(pulse_height, context) << "Signature from pulse handshake bit does not validate with node " << msg.handshakes.quorum_position << ":" << lokimq::to_hex(tools::view_guts(validator_key)) << ", at height " << pulse_height << "; Validator signing outdated height or bad handshake data");
continue;
}
}
crypto::public_key const &validator_key = context.wait_for_round.quorum.validators[msg.quorum_position];
if (msg.type == pulse::message_type::handshake)
switch(msg.type)
{
// TODO(doyle): We need some lenience in time for accepting early
// handshakes in case clocks are slightly out of sync.
if (!msg_time_check(pulse_height, context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_handshakes.end_time))
continue;
default: assert(msg.type == pulse::message_type::invalid); break;
// TODO(doyle): DRY
// NOTE: Validate Signature
case pulse::message_type::handshake:
{
auto buf = tools::memcpy_le(top_hash.data, msg.quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
if (!crypto::check_signature(hash, validator_key, msg.signature))
{
MERROR(log_prefix(pulse_height, context) << "Signature from pulse handshake bit does not validate with node " << msg.quorum_position << ":" << lokimq::to_hex(tools::view_guts(validator_key)) << ", at height " << pulse_height << "; Validator signing outdated height or bad handshake data");
// TODO(doyle): We need some lenience in time for accepting early
// handshakes in case clocks are slightly out of sync.
if (!msg_time_check(pulse_height, context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_handshakes.end_time))
continue;
uint16_t quorum_position_bit = 1 << msg.handshakes.quorum_position;
relay_message = ((context.wait_for_handshakes.validator_bits & quorum_position_bit) == 0);
context.wait_for_handshakes.validator_bits |= quorum_position_bit;
finish_pumping = context.wait_for_handshakes.all_received();
if (relay_message) // First time seen handshake
{
auto position_bitset = std::bitset<sizeof(quorum_position_bit) * 8>(quorum_position_bit);
auto validator_bitset = std::bitset<sizeof(quorum_position_bit) * 8>(context.wait_for_handshakes.validator_bits);
MGINFO(log_prefix(pulse_height, context) << "Received handshake with quorum position bit (" << msg.handshakes.quorum_position <<") " << position_bitset << " saved to bitset " << validator_bitset);
}
}
break;
uint16_t quorum_position_bit = 1 << msg.quorum_position;
relay_message = ((context.wait_for_handshakes.validator_bits & quorum_position_bit) == 0);
context.wait_for_handshakes.validator_bits |= quorum_position_bit;
finish_pumping = context.wait_for_handshakes.all_received();
if (relay_message) // First time seen handshake
case pulse::message_type::handshake_bitset:
{
auto position_bitset = std::bitset<sizeof(quorum_position_bit) * 8>(quorum_position_bit);
auto validator_bitset = std::bitset<sizeof(quorum_position_bit) * 8>(context.wait_for_handshakes.validator_bits);
MGINFO(log_prefix(pulse_height, context) << "Handshake message with quorum position bit (" << msg.quorum_position <<") " << position_bitset << " saved to bitset " << validator_bitset);
if (!msg_time_check(pulse_height, context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_other_validator_handshake_bitsets.end_time))
continue;
uint16_t prev_bitset = context.wait_for_other_validator_handshake_bitsets.received_bitsets[msg.handshakes.quorum_position];
if (!context.round_starts.is_producer)
relay_message = (prev_bitset != msg.handshakes.validator_bitset);
context.wait_for_other_validator_handshake_bitsets.received_bitsets[msg.handshakes.quorum_position] = msg.handshakes.validator_bitset;
if (prev_bitset == 0)
context.wait_for_other_validator_handshake_bitsets.received_bitsets_count++;
finish_pumping = context.wait_for_other_validator_handshake_bitsets.all_received();
}
}
else if (msg.type == pulse::message_type::handshake_bitset)
{
if (!msg_time_check(pulse_height, context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_other_validator_handshake_bitsets.end_time))
continue;
break;
// NOTE: Validate Signature
case pulse::message_type::block_template:
{
auto buf = tools::memcpy_le(msg.validator_bitset, top_hash.data, msg.quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
assert(!context.round_starts.is_producer);
if (!crypto::check_signature(hash, validator_key, msg.signature))
if (!msg_time_check(pulse_height, context, msg, pulse::clock::now(), context.wait_for_handshakes.start_time, context.wait_for_block_template.end_time))
continue;
// TODO(doyle): Time check
cryptonote::block block = {};
if (!cryptonote::t_serializable_object_from_blob(block, msg.block_template.blob))
{
MERROR(log_prefix(pulse_height, context) << "Signature from pulse handshake bitset does not validate with node " << msg.quorum_position << ":" << lokimq::to_hex(tools::view_guts(validator_key)) << ", at height " << pulse_height << "; Validator signing outdated height or bad handshake data");
MGINFO(log_prefix(pulse_height, context) << "Received unparsable pulse block template blob");
continue;
}
crypto::public_key const &block_producer = context.wait_for_round.quorum.workers[0];
crypto::hash hash = crypto::cn_fast_hash(msg.block_template.blob.data(), msg.block_template.blob.size());
if (!crypto::check_signature(hash, block_producer, msg.block_template.signature))
{
MGINFO(log_prefix(pulse_height, context) << "Received pulse block template not signed by the block producer");
continue;
}
if (block.pulse.round != context.wait_next_block.round)
{
MGINFO(log_prefix(pulse_height, context) << "Received pulse block template specifying different round " << +block.pulse.round << ", expected " << +context.wait_next_block.round);
continue;
}
if (block.pulse.validator_participation_bits != context.submit_block_template.validator_bitset)
{
auto block_bitset = std::bitset<sizeof(block.pulse.validator_participation_bits) * 8>(block.pulse.validator_participation_bits);
auto our_bitset = std::bitset<sizeof(block.pulse.validator_participation_bits) * 8>(context.submit_block_template.validator_bitset);
MGINFO(log_prefix(pulse_height, context) << "Received pulse block template specifying different validator handshake bitsets " << block_bitset << ", expected " << our_bitset);
continue;
}
if (!context.wait_for_block_template.received)
{
context.wait_for_block_template.received = true;
context.wait_for_block_template.block = std::move(block);
relay_message = true;
}
finish_pumping = true;
}
uint16_t prev_bitset = context.wait_for_other_validator_handshake_bitsets.received_bitsets[msg.quorum_position];
relay_message = prev_bitset != msg.validator_bitset;
context.wait_for_other_validator_handshake_bitsets.received_bitsets[msg.quorum_position] = msg.validator_bitset;
if (prev_bitset == 0)
context.wait_for_other_validator_handshake_bitsets.received_bitsets_count++;
finish_pumping = context.wait_for_other_validator_handshake_bitsets.all_received();
}
else
{
assert(msg.type == pulse::message_type::invalid);
break;
}
if (relay_message)
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.wait_for_round.quorum);
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.wait_for_round.quorum, context.round_starts.is_producer);
if (finish_pumping)
break;

View File

@ -31,6 +31,7 @@ enum struct message_type : uint8_t
invalid,
handshake,
handshake_bitset,
block_template,
};
constexpr std::string_view message_type_string(message_type type)
@ -47,10 +48,20 @@ constexpr std::string_view message_type_string(message_type type)
struct message
{
message_type type;
uint16_t quorum_position;
uint16_t validator_bitset;
crypto::signature signature;
message_type type;
struct
{
uint16_t quorum_position;
uint16_t validator_bitset; // Set if type is handshake_bitset, otherwise 0.
crypto::signature signature;
} handshakes;
struct
{
std::string blob;
crypto::signature signature;
} block_template;
};
struct state : public cryptonote::BlockAddedHook

View File

@ -8,24 +8,26 @@
namespace service_nodes {
constexpr size_t PULSE_QUORUM_ENTROPY_LAG = 21; // How many blocks back from the tip of the Blockchain to source entropy for the Pulse quorums.
#if defined(LOKI_ENABLE_INTEGRATION_TEST_HOOKS)
constexpr auto PULSE_TIME_PER_BLOCK = std::chrono::seconds(5);
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = std::chrono::seconds(2);
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = std::chrono::seconds(3);
constexpr auto PULSE_TIME_PER_BLOCK = std::chrono::seconds(20);
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = std::chrono::seconds(7);
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = std::chrono::seconds(7);
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = std::chrono::seconds(6);
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 0;
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 0;
#else
constexpr auto PULSE_TIME_PER_BLOCK = std::chrono::minutes(2);
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = std::chrono::minutes(1);
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = std::chrono::minutes(1);
constexpr auto PULSE_TIME_PER_BLOCK = std::chrono::seconds(20);
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = std::chrono::seconds(7);
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = std::chrono::seconds(7);
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = std::chrono::seconds(6);
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 11;
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 7;
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 7; // A block must have exactly N signatures to be considered properly
#endif
constexpr size_t PULSE_QUORUM_SIZE = PULSE_QUORUM_NUM_VALIDATORS + 1 /*Leader*/;
constexpr size_t PULSE_QUORUM_SIZE = PULSE_QUORUM_NUM_VALIDATORS + 1 /*Leader*/;
static_assert(PULSE_TIME_PER_BLOCK ==
PULSE_WAIT_FOR_HANDSHAKES_DURATION + PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION);
PULSE_WAIT_FOR_HANDSHAKES_DURATION + PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION + PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION);
static_assert(PULSE_QUORUM_NUM_VALIDATORS >= PULSE_BLOCK_REQUIRED_SIGNATURES);
static_assert(PULSE_QUORUM_ENTROPY_LAG >= PULSE_QUORUM_SIZE, "We need to pull atleast PULSE_QUORUM_SIZE number of blocks from the Blockchain, we can't if the amount of blocks to go back from the tip of the Blockchain is less than the blocks we need.");

View File

@ -140,6 +140,77 @@ E get_enum(const bt_dict &d, const std::string &key) {
throw std::invalid_argument("invalid enum value for field " + key);
}
struct prepared_relay_destinations
{
std::string x25519_string;
std::string connect_string;
};
// Relay data to a random subset of the quorum up to num_peers. If the sender is
// a validator in the quorum, prefer peer_info to get a fully connected relay
// with redundancy.
// Returns the number of peers it actually prepared relay destinations.
template <typename It>
std::vector<prepared_relay_destinations>
peer_prepare_relay_to_quorum_subset(cryptonote::core &core, It quorum_begin, It quorum_end, size_t num_peers) {
// Lookup the x25519 and ZMQ connection string for all possible blink recipients so that we
// know where to send it to, and so that we can immediately exclude SNs that aren't active
// anymore.
std::unordered_set<crypto::public_key> candidates;
for (auto it = quorum_begin; it != quorum_end; it++)
candidates.insert((*it)->validators.begin(), (*it)->validators.end());
MDEBUG("Have " << candidates.size() << " SN candidates");
std::vector<std::tuple<std::string, std::string, decltype(proof_info{}.version)>> remotes; // {x25519 pubkey, connect string, version}
remotes.reserve(candidates.size());
core.get_service_node_list().for_each_service_node_info_and_proof(candidates.begin(), candidates.end(),
[&remotes](const auto &pubkey, const auto &info, const auto &proof) {
if (!info.is_active()) {
MTRACE("Not include inactive node " << pubkey);
return;
}
if (!proof.pubkey_x25519 || !proof.quorumnet_port || !proof.public_ip) {
MTRACE("Not including node " << pubkey << ": missing x25519(" << to_hex(get_data_as_string(proof.pubkey_x25519)) << "), "
"public_ip(" << epee::string_tools::get_ip_string_from_int32(proof.public_ip) << "), or qnet port(" << proof.quorumnet_port << ")");
return;
}
remotes.emplace_back(get_data_as_string(proof.pubkey_x25519),
"tcp://" + epee::string_tools::get_ip_string_from_int32(proof.public_ip) + ":" + std::to_string(proof.quorumnet_port),
proof.version);
});
// Select 4 random SNs to send the data to, but prefer SNs with newer versions because they may have network fixes.
MDEBUG("Have " << remotes.size() << " candidates after checking active status and connection details");
std::vector<size_t> indices(remotes.size());
std::iota(indices.begin(), indices.end(), 0);
std::shuffle(indices.begin(), indices.end(), tools::rng);
// Stable sort by version so that we keep the shuffled order within a version
using std::get;
std::stable_sort(indices.begin(), indices.end(), [&remotes](size_t a, size_t b) {
return get<2>(remotes[a]) > get<2>(remotes[b]); });
if (indices.size() > num_peers)
indices.resize(num_peers);
std::vector<prepared_relay_destinations> result;
result.reserve(indices.size());
for (size_t i : indices)
result.push_back({std::move(get<0>(remotes[i])), std::move(get<1>(remotes[i]))});
return result;
}
void peer_relay_to_prepared_destinations(cryptonote::core &core, std::vector<prepared_relay_destinations> const &destinations, std::string_view command, std::string &&data)
{
for (auto const &[x25519_string, connect_string]: destinations) {
MINFO("Relaying data to " << to_hex(x25519_string) << " @ " << connect_string);
core.get_lmq().send(x25519_string, command, std::move(data), send_option::hint{connect_string});
}
}
/// Helper class to calculate and relay to peers of quorums.
///
/// TODO: add a wrapper that caches this so that looking up the same quorum peers within a certain
@ -217,7 +288,7 @@ public:
if (include_workers) {
auto &w = (*qit)->workers;
for (size_t i = 0; i < w.size(); i++) {
if (!exclude.count(w[i])) add_peer(w[i]);
if (!exclude.count(w[i])) need_remotes.insert(w[i]);
}
}
}
@ -1223,50 +1294,6 @@ std::future<std::pair<cryptonote::blink_result, std::string>> send_blink(crypton
uint64_t checksum;
auto quorums = get_blink_quorums(height, core.get_service_node_list(), nullptr, &checksum);
// Lookup the x25519 and ZMQ connection string for all possible blink recipients so that we
// know where to send it to, and so that we can immediately exclude SNs that aren't active
// anymore.
std::unordered_set<crypto::public_key> candidates;
for (auto &q : quorums)
candidates.insert(q->validators.begin(), q->validators.end());
MDEBUG("Have " << candidates.size() << " blink SN candidates");
std::vector<std::tuple<std::string, std::string, decltype(proof_info{}.version)>> remotes; // {x25519 pubkey, connect string, version}
remotes.reserve(candidates.size());
core.get_service_node_list().for_each_service_node_info_and_proof(candidates.begin(), candidates.end(),
[&remotes](const auto &pubkey, const auto &info, const auto &proof) {
if (!info.is_active()) {
MTRACE("Not include inactive node " << pubkey);
return;
}
if (!proof.pubkey_x25519 || !proof.quorumnet_port || !proof.public_ip) {
MTRACE("Not including node " << pubkey << ": missing x25519(" << to_hex(get_data_as_string(proof.pubkey_x25519)) << "), "
"public_ip(" << epee::string_tools::get_ip_string_from_int32(proof.public_ip) << "), or qnet port(" << proof.quorumnet_port << ")");
return;
}
remotes.emplace_back(get_data_as_string(proof.pubkey_x25519),
"tcp://" + epee::string_tools::get_ip_string_from_int32(proof.public_ip) + ":" + std::to_string(proof.quorumnet_port),
proof.version);
});
MDEBUG("Have " << remotes.size() << " blink SN candidates after checking active status and connection details");
// Select 4 random (active) blink quorum SNs to send the blink to, but prefer SNs with newer
// versions because they may have blink-related fixes.
std::vector<size_t> indices(remotes.size());
std::iota(indices.begin(), indices.end(), 0);
std::shuffle(indices.begin(), indices.end(), tools::rng);
// Stable sort by version so that we keep the shuffled order within a version
using std::get;
std::stable_sort(indices.begin(), indices.end(), [&remotes](size_t a, size_t b) {
return get<2>(remotes[a]) > get<2>(remotes[b]); });
if (indices.size() > 4)
indices.resize(4);
brd->remote_count = indices.size();
std::string data = bt_serialize<bt_dict>({
{"!", blink_tag},
{"#", get_data_as_string(tx_hash)},
@ -1275,10 +1302,10 @@ std::future<std::pair<cryptonote::blink_result, std::string>> send_blink(crypton
{"t", tx_blob}
});
for (size_t i : indices) {
MINFO("Relaying blink tx to " << to_hex(get<0>(remotes[i])) << " @ " << get<1>(remotes[i]));
core.get_lmq().send(get<0>(remotes[i]), "blink.submit", data, send_option::hint{get<1>(remotes[i])});
}
auto destinations = peer_prepare_relay_to_quorum_subset(core, quorums.begin(), quorums.end(), 4 /*num_peers*/);
brd->remote_count = destinations.size();
peer_relay_to_prepared_destinations(core, destinations, "blink.submit"sv, std::move(data));
} catch (...) {
std::unique_lock lock{pending_blink_result_mutex};
auto it = pending_blink_results.find(blink_tag); // Look up again because `brd` might have been deleted
@ -1398,12 +1425,15 @@ void handle_blink_success(Message& m) {
const std::string PULSE_TAG_VALIDATOR_BITSET = "b";
const std::string PULSE_TAG_QUORUM_POSITION = "q";
const std::string PULSE_TAG_SIGNATURE = "s";
const std::string PULSE_TAG_BLOCK_TEMPLATE = "t";
const std::string PULSE_CMD_CATEGORY = "pulse";
const std::string PULSE_CMD_VALIDATOR_BITSET = "validator_bitset";
const std::string PULSE_CMD_VALIDATOR_BIT = "validator_bit";
const std::string PULSE_CMD_BLOCK_TEMPLATE = "block_template";
const std::string PULSE_CMD_SEND_VALIDATOR_BITSET = PULSE_CMD_CATEGORY + "." + PULSE_CMD_VALIDATOR_BITSET;
const std::string PULSE_CMD_SEND_VALIDATOR_BIT = PULSE_CMD_CATEGORY + "." + PULSE_CMD_VALIDATOR_BIT;
const std::string PULSE_CMD_SEND_BLOCK_TEMPLATE = PULSE_CMD_CATEGORY + "." + PULSE_CMD_BLOCK_TEMPLATE;
bt_dict pulse_serialize_message(pulse::message const &msg)
{
@ -1411,6 +1441,7 @@ bt_dict pulse_serialize_message(pulse::message const &msg)
switch(msg.type)
{
default:
case pulse::message_type::invalid:
{
assert(!"Invalid Code Path");
@ -1419,16 +1450,23 @@ bt_dict pulse_serialize_message(pulse::message const &msg)
case pulse::message_type::handshake:
{
result = {{PULSE_TAG_QUORUM_POSITION, msg.quorum_position},
{PULSE_TAG_SIGNATURE, tools::view_guts(msg.signature)}};
result = {{PULSE_TAG_QUORUM_POSITION, msg.handshakes.quorum_position},
{PULSE_TAG_SIGNATURE, tools::view_guts(msg.handshakes.signature)}};
}
break;
case pulse::message_type::handshake_bitset:
{
result = {{PULSE_TAG_VALIDATOR_BITSET, msg.validator_bitset},
{PULSE_TAG_QUORUM_POSITION, msg.quorum_position},
{PULSE_TAG_SIGNATURE, tools::view_guts(msg.signature)}};
result = {{PULSE_TAG_VALIDATOR_BITSET, msg.handshakes.validator_bitset},
{PULSE_TAG_QUORUM_POSITION, msg.handshakes.quorum_position},
{PULSE_TAG_SIGNATURE, tools::view_guts(msg.handshakes.signature)}};
}
break;
case pulse::message_type::block_template:
{
result = {{PULSE_TAG_SIGNATURE, tools::view_guts(msg.block_template.signature)},
{PULSE_TAG_BLOCK_TEMPLATE, msg.block_template.blob}};
}
break;
}
@ -1436,24 +1474,59 @@ bt_dict pulse_serialize_message(pulse::message const &msg)
return result;
}
void pulse_relay_message_to_quorum(void *self, pulse::message const &msg, service_nodes::quorum const &quorum)
void pulse_relay_message_to_quorum(void *self, pulse::message const &msg, service_nodes::quorum const &quorum, bool block_producer)
{
assert(msg.quorum_position < quorum.validators.size());
peer_info::exclude_set relay_exclude;
relay_exclude.insert(quorum.validators[msg.quorum_position]);
bool include_block_producer = msg.type == pulse::message_type::handshake_bitset;
bool include_block_producer = false;
std::string_view command = {};
switch(msg.type)
{
default: break;
case pulse::message_type::block_template:
{
command = PULSE_CMD_SEND_BLOCK_TEMPLATE;
}
break;
case pulse::message_type::handshake: /* FALLTHRU */
case pulse::message_type::handshake_bitset:
{
assert(msg.handshakes.quorum_position < quorum.validators.size());
include_block_producer = msg.type == pulse::message_type::handshake_bitset;
relay_exclude.insert(quorum.validators[msg.handshakes.quorum_position]);
if (msg.type == pulse::message_type::handshake)
command = PULSE_CMD_SEND_VALIDATOR_BIT;
else
{
assert(msg.type == pulse::message_type::handshake_bitset);
command = PULSE_CMD_SEND_VALIDATOR_BITSET;
}
}
break;
}
QnetState &qnet = *static_cast<QnetState *>(self);
peer_info peer_list{qnet,
quorum_type::pulse,
&quorum,
true /*opportunistic*/,
std::move(relay_exclude),
include_block_producer /*include_workers*/};
if (block_producer)
{
service_nodes::quorum const *quorum_ptr = &quorum;
auto destinations = peer_prepare_relay_to_quorum_subset(qnet.core, &quorum_ptr, &quorum_ptr + 1, 4 /*num_peers*/);
peer_relay_to_prepared_destinations(qnet.core, destinations, command, bt_serialize(pulse_serialize_message(msg)));
}
else
{
peer_info peer_list{qnet,
quorum_type::pulse,
&quorum,
true /*opportunistic*/,
std::move(relay_exclude),
include_block_producer /*include_workers*/};
auto command = (msg.type == pulse::message_type::handshake_bitset) ? PULSE_CMD_SEND_VALIDATOR_BITSET : PULSE_CMD_SEND_VALIDATOR_BIT;
peer_list.relay_to_peers(command, pulse_serialize_message(msg));
peer_list.relay_to_peers(command, pulse_serialize_message(msg));
}
}
// Send participation handshake to other validators in the quorum. Caller must
@ -1474,8 +1547,8 @@ void send_pulse_validator_handshake_bit_or_bitset(void *self, bool sending_bitse
{
if (quorum.validators[index] == core.get_service_keys().pub)
{
msg.quorum_position = index;
in_quorum = true;
msg.handshakes.quorum_position = index;
in_quorum = true;
break;
}
}
@ -1486,25 +1559,25 @@ void send_pulse_validator_handshake_bit_or_bitset(void *self, bool sending_bitse
auto command = (sending_bitset) ? PULSE_CMD_SEND_VALIDATOR_BITSET : PULSE_CMD_SEND_VALIDATOR_BIT;
if (sending_bitset)
{
auto buf = tools::memcpy_le(validator_bitset, top_hash.data, msg.quorum_position);
auto buf = tools::memcpy_le(validator_bitset, top_hash.data, msg.handshakes.quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, msg.signature);
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, msg.handshakes.signature);
msg.type = pulse::message_type::handshake_bitset;
msg.validator_bitset = validator_bitset;
msg.type = pulse::message_type::handshake_bitset;
msg.handshakes.validator_bitset = validator_bitset;
}
else
{
auto buf = tools::memcpy_le(top_hash.data, msg.quorum_position);
auto buf = tools::memcpy_le(top_hash.data, msg.handshakes.quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, msg.signature);
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, msg.handshakes.signature);
msg.type = pulse::message_type::handshake;
}
pulse_relay_message_to_quorum(self, msg, quorum);
pulse_relay_message_to_quorum(self, msg, quorum, false /*block_producer*/);
}
void send_pulse_validator_handshake_bit(void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash)
@ -1517,6 +1590,19 @@ void send_pulse_validator_handshake_bitset(void *self, service_nodes::quorum con
send_pulse_validator_handshake_bit_or_bitset(self, true /*sending_bitset*/, quorum, top_hash, validator_bitset);
}
void send_pulse_block_template(void *self, std::string &&blob, crypto::signature const &signature, service_nodes::quorum const &quorum)
{
QnetState &qnet = *static_cast<QnetState *>(self);
cryptonote::core &core = qnet.core;
pulse::message msg = {};
msg.type = pulse::message_type::block_template;
msg.block_template.blob = std::move(blob);
msg.block_template.signature = signature;
pulse_relay_message_to_quorum(self, msg, quorum, true /*block_producer*/);
}
// Invoked when daemon has received a participation handshake message via
// QuorumNet from another validator, either forwarded or originating from that
// node. The message is added to the Pulse message queue and validating the
@ -1576,13 +1662,13 @@ void handle_pulse_participation_bit_or_bitset(Message &m, QnetState& qnet, bool
assert(signature);
}
pulse::message msg = {};
msg.signature = signature;
msg.quorum_position = quorum_position;
pulse::message msg = {};
msg.handshakes.signature = signature;
msg.handshakes.quorum_position = quorum_position;
if (bitset)
{
msg.type = pulse::message_type::handshake_bitset;
msg.validator_bitset = validator_bitset;
msg.type = pulse::message_type::handshake_bitset;
msg.handshakes.validator_bitset = validator_bitset;
}
else
{
@ -1596,6 +1682,37 @@ void handle_pulse_participation_bit_or_bitset(Message &m, QnetState& qnet, bool
qnet.pulse_message_queue_cv.notify_one();
}
void handle_pulse_block_template(Message &m, QnetState &qnet)
{
if (m.data.size() != 1)
throw std::runtime_error(std::string("Rejecting pulse block template expected one data entry not ") + std::to_string(m.data.size()));
bt_dict_consumer data{m.data[0]};
pulse::message msg = {};
msg.type = pulse::message_type::block_template;
{
std::string_view INVALID_ARG_PREFIX = "Invalid pulse block template: missing required field '"sv;
if (data.skip_until(PULSE_TAG_SIGNATURE)) {
auto sig_str = data.consume_string_view();
msg.block_template.signature = convert_string_view_bytes_to_signature(sig_str);
} else {
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(PULSE_TAG_SIGNATURE) + "'");
}
if (data.skip_until(PULSE_TAG_BLOCK_TEMPLATE))
msg.block_template.blob = data.consume_string_view();
else
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(PULSE_TAG_QUORUM_POSITION) + "'");
}
{
std::unique_lock lock{qnet.pulse_message_queue_mutex};
qnet.pulse_message_queue.push(std::move(msg));
}
qnet.pulse_message_queue_cv.notify_one();
}
bool pulse_message_queue_pump_messages(void *self, pulse::message &msg, pulse::time_point sleep_until)
{
auto qnet = static_cast<QnetState *>(self);
@ -1632,6 +1749,7 @@ void init_core_callbacks() {
cryptonote::quorumnet_send_pulse_validator_handshake_bit = send_pulse_validator_handshake_bit;
cryptonote::quorumnet_send_pulse_validator_handshake_bitset = send_pulse_validator_handshake_bitset;
cryptonote::quorumnet_send_pulse_block_template = send_pulse_block_template;
cryptonote::quorumnet_pulse_pump_messages = pulse_message_queue_pump_messages;
@ -1678,6 +1796,7 @@ void setup_endpoints(QnetState& qnet) {
lmq.add_category(PULSE_CMD_CATEGORY, Access{AuthLevel::none, true /*remote sn*/, true /*local sn*/}, 1 /*reserved thread*/)
.add_command(PULSE_CMD_VALIDATOR_BIT, [&qnet](Message& m) { handle_pulse_participation_bit_or_bitset(m, qnet, false /*bitset*/); })
.add_command(PULSE_CMD_VALIDATOR_BITSET, [&qnet](Message& m) { handle_pulse_participation_bit_or_bitset(m, qnet, true /*bitset*/); })
.add_command(PULSE_CMD_BLOCK_TEMPLATE, [&qnet](Message& m) { handle_pulse_block_template(m, qnet); })
;
// Compatibility aliases. No longer used since 7.1.4, but can still be received from previous