Pulse: Add main block gen/validator handshake synchronizing worker

This commit is contained in:
Doyle 2020-07-27 15:54:02 +10:00
parent 73ea959c8d
commit 2232ecb46b
7 changed files with 847 additions and 14 deletions

View File

@ -39,7 +39,8 @@ add_library(cryptonote_core
loki_name_system.cpp
tx_pool.cpp
tx_sanity_check.cpp
cryptonote_tx_utils.cpp)
cryptonote_tx_utils.cpp
pulse.cpp)
target_link_libraries(cryptonote_core
PUBLIC

View File

@ -68,7 +68,6 @@ extern "C" {
#include "memwipe.h"
#include "common/i18n.h"
#include "net/local_ip.h"
#include "cryptonote_protocol/quorumnet.h"
#include "common/loki_integration_test_hooks.h"
@ -277,6 +276,13 @@ namespace cryptonote
quorumnet_relay_obligation_votes_proc *quorumnet_relay_obligation_votes = [](void*, const std::vector<service_nodes::quorum_vote_t>&) { need_core_init(); };
quorumnet_send_blink_proc *quorumnet_send_blink = [](core&, const std::string&) -> std::future<std::pair<blink_result, std::string>> { need_core_init(); };
quorumnet_send_pulse_validator_handshake_bit_proc *quorumnet_send_pulse_validator_handshake_bit = [](void *, service_nodes::quorum const &, crypto::hash const &) -> void { need_core_init(); };
quorumnet_send_pulse_validator_handshake_bitset_proc *quorumnet_send_pulse_validator_handshake_bitset = [](void *, service_nodes::quorum const &, crypto::hash const &, uint16_t) -> void { need_core_init(); };
quorumnet_pulse_pump_messages_proc *quorumnet_pulse_pump_messages = [](void*, pulse::message &, pulse::time_point) -> bool { need_core_init(); return false; };
quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum = [](void *, pulse::message const &, service_nodes::quorum const &) -> void { need_core_init(); };
//-----------------------------------------------------------------------------------------------
core::core()
: m_mempool(m_blockchain_storage)
@ -1077,7 +1083,18 @@ namespace cryptonote
void core::start_lokimq() {
update_lmq_sns(); // Ensure we have SNs set for the current block before starting
m_lmq->start();
if (m_service_node)
{
m_blockchain_storage.hook_block_added(m_pulse_state);
lokimq::TaggedThreadID pulse_thread_id = m_lmq->add_tagged_thread("pulse");
m_lmq->start();
m_lmq->job([this]() { pulse::main(m_pulse_state, m_quorumnet_state, *this); }, pulse_thread_id);
}
else
{
m_lmq->start();
}
}
//-----------------------------------------------------------------------------------------------

View File

@ -47,10 +47,12 @@
#include "service_node_voting.h"
#include "service_node_list.h"
#include "service_node_quorum_cop.h"
#include "pulse.h"
#include "cryptonote_basic/miner.h"
#include "cryptonote_basic/connection_context.h"
#include "warnings.h"
#include "crypto/hash.h"
#include "cryptonote_protocol/quorumnet.h"
PUSH_WARNINGS
DISABLE_VS_WARNINGS(4355)
@ -96,11 +98,25 @@ namespace cryptonote
// rpc/http_server.cpp's init_options().
extern void (*long_poll_trigger)(tx_memory_pool& pool);
// TODO(doyle)
using quorumnet_send_pulse_validator_handshake_bit_proc = void (void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash);
using quorumnet_send_pulse_validator_handshake_bitset_proc = void (void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash, uint16_t handshake_bitset);
using quorumnet_pulse_pump_messages_proc = bool (void *self, pulse::message &msg, pulse::time_point end_time);
using quorumnet_pulse_relay_message_to_quorum_proc = void (void *, pulse::message const &msg, service_nodes::quorum const &quorum);
extern quorumnet_new_proc *quorumnet_new;
extern quorumnet_delete_proc *quorumnet_delete;
extern quorumnet_relay_obligation_votes_proc *quorumnet_relay_obligation_votes;
extern quorumnet_send_blink_proc *quorumnet_send_blink;
extern quorumnet_send_pulse_validator_handshake_bit_proc *quorumnet_send_pulse_validator_handshake_bit;
extern quorumnet_send_pulse_validator_handshake_bitset_proc *quorumnet_send_pulse_validator_handshake_bitset;
extern quorumnet_pulse_pump_messages_proc *quorumnet_pulse_pump_messages;
extern quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum;
/************************************************************************/
/* */
/************************************************************************/
@ -1231,6 +1247,8 @@ namespace cryptonote
bool building = false;
uint64_t height = 0, emissions = 0, fees = 0, burnt = 0;
} m_coinbase_cache;
pulse::state m_pulse_state{};
};
}

View File

@ -0,0 +1,508 @@
#include <array>
#include <mutex>
#include <chrono>
#include "misc_log_ex.h"
#include "cryptonote_core.h"
#include "cryptonote_basic/hardfork.h"
#include "service_node_list.h"
#include "service_node_quorum_cop.h"
#include "service_node_rules.h"
enum struct round_state
{
wait_next_block,
wait_for_round,
wait_for_handshakes,
wait_for_other_validator_handshake_bitsets,
submit_block_template,
};
struct round_context
{
struct
{
pulse::time_point round_0_start_time;
uint8_t round;
} wait_next_block;
struct
{
service_nodes::quorum quorum;
bool producer;
} wait_for_round;
struct
{
uint16_t validator_bits;
pulse::time_point end_time;
bool all_received() { return validator_bits == service_nodes::pulse_validator_bit_mask(); }
} wait_for_handshakes;
struct
{
std::array<uint16_t, service_nodes::PULSE_QUORUM_NUM_VALIDATORS> received_bitsets;
int received_bitsets_count;
pulse::time_point end_time;
bool all_received() { return received_bitsets_count == service_nodes::PULSE_QUORUM_NUM_VALIDATORS; }
} wait_for_other_validator_handshake_bitsets;
struct
{
uint16_t validator_bitset;
} submit_block_template;
};
namespace
{
round_state sleep_until_next_block_or_round(cryptonote::Blockchain const &blockchain, round_context &context, std::condition_variable &cv, std::mutex &mutex, uint64_t curr_height)
{
// TODO(doyle): Handle this error better
// assert(context.wait_next_block.round <= static_cast<uint8_t>(-1));
context.wait_next_block.round++;
pulse::time_point const start_time = context.wait_next_block.round_0_start_time +
(context.wait_next_block.round * service_nodes::PULSE_TIME_PER_BLOCK);
if (auto now = pulse::clock::now(); now < start_time)
{
auto duration = std::chrono::duration_cast<std::chrono::seconds>(start_time - now).count();
MGINFO("Pulse: Sleeping " << duration << "s until pulse round " << +context.wait_next_block.round << " commences for block " << curr_height);
uint64_t height = 0;
{
std::unique_lock lock{mutex};
cv.wait_until(lock, start_time, [&blockchain, start_time, &height, curr_height]() {
bool wakeup_time = pulse::clock::now() >= start_time;
height = blockchain.get_current_blockchain_height(true /*lock*/);
return wakeup_time || (height != curr_height);
});
}
if (height != curr_height)
{
MGINFO("Pulse: Blockchain height changed during sleep from " << curr_height << " to " << height << ", re-evaluating pulse block");
return round_state::wait_next_block;
}
}
return round_state::wait_for_round;
}
} // anonymous namespace
bool pulse::state::block_added(const cryptonote::block& block, const std::vector<cryptonote::transaction>&, cryptonote::checkpoint_t const *)
{
// TODO(doyle): Better check than heuristics.
cryptonote::pulse_header null_header = {};
if (block.signatures.size() != service_nodes::PULSE_BLOCK_REQUIRED_SIGNATURES || std::memcmp(&block.pulse, &null_header, sizeof(null_header) == 0))
last_miner_block = cryptonote::get_block_height(block);
block_added_cv.notify_one();
return true;
}
void pulse::main(pulse::state &state, void *quorumnet_state, cryptonote::core &core)
{
cryptonote::Blockchain &blockchain = core.get_blockchain_storage();
service_nodes::service_node_keys const &key = core.get_service_keys();
bool base_block_initialized = false;
cryptonote::block base_pulse_block = {};
crypto::hash top_hash = {};
uint64_t pulse_height = 0;
std::mutex pulse_mutex;
round_context context = {};
uint64_t const hf16_height = cryptonote::HardFork::get_hardcoded_hard_fork_height(blockchain.nettype(), cryptonote::network_version_16);
if (hf16_height == HardFork::INVALID_HF_VERSION_HEIGHT)
return;
for (auto round_state = round_state::wait_next_block;;)
{
switch (round_state)
{
default:
break;
case round_state::wait_next_block:
{
uint64_t curr_height = blockchain.get_current_blockchain_height(true /*lock*/);
#if 0
uint64_t starting_height = std::max(hf16_height, state.last_miner_block.load());
#else
uint64_t starting_height = curr_height - 1; // (DEBUG): Make next block timestamps relative to previous block.
#endif
context = {};
//
// NOTE: Sleep until Blockchain is ready to produce Pulse Blocks
//
if (curr_height < starting_height)
{
MGINFO("Pulse: Network at block " << curr_height << " is not ready for Pulse until block " << starting_height << ", worker going to sleep");
std::unique_lock lock{pulse_mutex};
state.block_added_cv.wait(lock, [&state, &blockchain, hf16_height, &curr_height, &starting_height]() {
curr_height = blockchain.get_current_blockchain_height(true /*lock*/);
starting_height = std::max(hf16_height, state.last_miner_block.load());
return curr_height >= starting_height;
});
}
//
// NOTE: If already processing pulse for height, wait for next height
//
if (pulse_height == curr_height)
{
MGINFO("Pulse: Network is currently producing block " << pulse_height << ", sleeping until next block");
std::unique_lock lock{pulse_mutex};
state.block_added_cv.wait(lock, [&blockchain, &curr_height, pulse_height]() {
curr_height = blockchain.get_current_blockchain_height(true /*lock*/);
return curr_height != pulse_height;
});
}
pulse_height = curr_height;
top_hash = blockchain.get_block_id_by_height(pulse_height - 1);
if (top_hash == crypto::null_hash)
{
MERROR("Pulse: Block hash for height " << pulse_height << " does not exist!");
continue;
}
//
// NOTE: Get the round start time
//
if (!base_block_initialized || (cryptonote::get_block_height(base_pulse_block) != starting_height))
{
std::vector<std::pair<cryptonote::blobdata, cryptonote::block>> block;
if (!blockchain.get_blocks(starting_height, 1, block))
{
MERROR("Pulse: Could not query block " << starting_height << " from the DB, unable to continue with Pulse");
continue;
}
base_block_initialized = true;
base_pulse_block = std::move(block[0].second);
}
auto base_timestamp = pulse::time_point(std::chrono::seconds(base_pulse_block.timestamp));
uint64_t delta_height = curr_height - cryptonote::get_block_height(base_pulse_block);
context.wait_next_block.round_0_start_time = base_timestamp + (delta_height * service_nodes::PULSE_TIME_PER_BLOCK);
//
// NOTE: Determine Pulse Round
//
if (auto now = pulse::clock::now(); now < context.wait_next_block.round_0_start_time)
{
context.wait_next_block.round = 0;
}
else
{
auto const time_since_block = pulse::clock::now() - context.wait_next_block.round_0_start_time;
size_t pulse_round_usize = time_since_block / service_nodes::PULSE_TIME_PER_BLOCK;
// TODO(doyle): We need to handle this error better.
// assert(pulse_round_usize < static_cast<uint8_t>(-1));
context.wait_next_block.round = static_cast<uint8_t>(pulse_round_usize);
}
round_state = round_state::wait_for_round;
}
break;
case round_state::wait_for_round:
{
context.wait_for_round = {};
pulse::time_point const start_time = context.wait_next_block.round_0_start_time +
(context.wait_next_block.round * service_nodes::PULSE_TIME_PER_BLOCK);
context.wait_for_handshakes.end_time = start_time + std::chrono::seconds(10);
context.wait_for_other_validator_handshake_bitsets.end_time = context.wait_for_handshakes.end_time + std::chrono::seconds(10);
//
// NOTE: Derive quorum for pulse round
//
{
context.wait_for_round.quorum =
service_nodes::generate_pulse_quorum(blockchain.nettype(),
blockchain.get_db(),
pulse_height - 1,
blockchain.get_service_node_list().get_block_leader().key,
blockchain.get_current_hard_fork_version(),
blockchain.get_service_node_list().active_service_nodes_infos(),
context.wait_next_block.round);
if (!service_nodes::verify_pulse_quorum_sizes(context.wait_for_round.quorum)) // Insufficient Service Nodes for quorum
{
MGINFO("Pulse: Insufficient Service Nodes to execute Pulse on height " << pulse_height << ", we require a PoW miner block. Sleeping until next block.");
round_state = round_state::wait_next_block;
continue;
}
}
//
// NOTE: Determine quorum participation
//
if (key.pub == context.wait_for_round.quorum.workers[0])
{
MGINFO("Pulse: We are the block producer for height " << pulse_height << " in round " << +context.wait_next_block.round << ", awaiting validator handshake bitsets.");
context.wait_for_round.producer = true;
round_state = round_state::wait_for_other_validator_handshake_bitsets;
}
else
{
bool validator = false;
for (crypto::public_key const &validator_key : context.wait_for_round.quorum.validators)
{
validator = (validator_key == key.pub);
if (validator) break;
}
if (!validator)
{
MGINFO("Pulse: We are not a pulse validator for height " << pulse_height << " in round " << +context.wait_next_block.round << ". Waiting for next pulse round or block.");
round_state = sleep_until_next_block_or_round(blockchain, context, state.block_added_cv, pulse_mutex, pulse_height);
break;
}
}
//
// NOTE: Sleep until round starts or block added
//
// TODO(doyle): DRY sleep code
if (auto now = pulse::clock::now(); now < start_time)
{
auto duration = std::chrono::duration_cast<std::chrono::seconds>(start_time - now).count();
MGINFO("Pulse: Sleeping " << duration << "s until pulse round " << +context.wait_next_block.round << " commences for block " << pulse_height);
uint64_t height = 0;
{
std::unique_lock lock{pulse_mutex};
state.block_added_cv.wait_until(lock, start_time, [&blockchain, start_time, &height, pulse_height]() {
bool wakeup_time = pulse::clock::now() >= start_time;
height = blockchain.get_current_blockchain_height(true /*lock*/);
return wakeup_time || (height != pulse_height);
});
}
if (height != pulse_height)
{
MGINFO("Pulse: Blockchain height changed during sleep from " << pulse_height << " to " << height << ", re-evaluating pulse block");
round_state = round_state::wait_next_block;
break;
}
}
if (!context.wait_for_round.producer)
{
try
{
MGINFO("Pulse: We are a pulse validator for height " << pulse_height << " in round " << +context.wait_next_block.round << ", sending handshake bit to quorum and collecting other validator handshakes.");
cryptonote::quorumnet_send_pulse_validator_handshake_bit(quorumnet_state, context.wait_for_round.quorum, top_hash);
round_state = round_state::wait_for_handshakes;
}
catch (std::exception const &e)
{
MERROR("Attempting to invoke and send a Pulse participation handshake unexpectedly failed. " << e.what());
round_state = sleep_until_next_block_or_round(blockchain, context, state.block_added_cv, pulse_mutex, pulse_height);
}
}
}
break;
case round_state::wait_for_handshakes:
{
// TODO(doyle): We need to sleep on this step, otherwise we busy loop on handshakes
// and same for the bitset step
bool timed_out = pulse::clock::now() >= context.wait_for_handshakes.end_time;
bool all_handshakes = context.wait_for_handshakes.all_received();
if (all_handshakes || timed_out)
{
std::bitset<8 * sizeof(context.wait_for_handshakes.validator_bits)> bitset = context.wait_for_handshakes.validator_bits;
bool missing_handshakes = timed_out && !all_handshakes;
MGINFO("Pulse: Collected validator handshakes " << bitset << (missing_handshakes ? ", we timed out and some handshakes were not seen! " : ". ") << "Sending handshake bitset and collecting other validator bitsets.");
try
{
cryptonote::quorumnet_send_pulse_validator_handshake_bitset(quorumnet_state, context.wait_for_round.quorum, top_hash, context.wait_for_handshakes.validator_bits);
round_state = round_state::wait_for_other_validator_handshake_bitsets;
}
catch(std::exception const &e)
{
MERROR("Attempting to invoke and send a Pulse participation handshake bitset unexpectedly failed. " << e.what());
round_state = sleep_until_next_block_or_round(blockchain, context, state.block_added_cv, pulse_mutex, pulse_height);
}
}
}
break;
case round_state::wait_for_other_validator_handshake_bitsets:
{
bool all_bitsets = context.wait_for_other_validator_handshake_bitsets.all_received();
bool timed_out = pulse::clock::now() >= context.wait_for_other_validator_handshake_bitsets.end_time;
if (timed_out || all_bitsets)
{
bool missing_bitsets = timed_out && !all_bitsets;
MGINFO("Pulse: Collected " << context.wait_for_other_validator_handshake_bitsets.received_bitsets_count << " handshake bitsets" << (missing_bitsets ? ", we timed out and some bitsets were not seen!" : ""));
std::map<uint16_t, int> most_common_validator_bitset;
for (uint16_t bits : context.wait_for_other_validator_handshake_bitsets.received_bitsets)
most_common_validator_bitset[bits]++;
uint16_t most_common_bitset = most_common_validator_bitset.begin()->first;
uint16_t count = most_common_validator_bitset.begin()->second;
if (count < (service_nodes::PULSE_QUORUM_NUM_VALIDATORS * 6 / 10))
{
// Less than 60% of the validators can't come to agreement
// about which validators are online, we wait until the
// next round.
MGINFO("Pulse: We heard back from less than 60% of the validators, waiting for next round.");
round_state = sleep_until_next_block_or_round(blockchain, context, state.block_added_cv, pulse_mutex, pulse_height);
}
else
{
std::bitset<8 * sizeof(most_common_bitset)> bitset = most_common_bitset;
MGINFO("Pulse: " << count << " validators agreed on the participating nodes in the quorum " << bitset << (context.wait_for_round.producer ? "" : "Awaiting block template from block producer"));
context.submit_block_template.validator_bitset = most_common_bitset;
if (context.wait_for_round.producer)
round_state = context.wait_for_round.producer ? round_state::submit_block_template : round_state::wait_next_block;
}
}
}
break;
case round_state::submit_block_template:
{
assert(context.wait_for_round.producer);
std::vector<service_nodes::service_node_pubkey_info> list_state = blockchain.get_service_node_list().get_service_node_list_state({key.pub});
if (list_state.empty())
{
MGINFO("Pulse: Block producer (us) is not available on the service node list, waiting until next round");
round_state = round_state::wait_next_block;
break;
}
// TODO(doyle): These checks can be done earlier?
std::shared_ptr<const service_nodes::service_node_info> info = list_state[0].info;
if (!info->is_active())
{
MGINFO("Pulse: Block producer (us) is not an active service node, waiting until next round");
round_state = sleep_until_next_block_or_round(blockchain, context, state.block_added_cv, pulse_mutex, pulse_height);
break;
}
MGINFO("Pulse: Validators are handshaken and ready, sending block template from producer (us) to validators.\n");
service_nodes::payout block_producer_payouts = service_nodes::service_node_info_to_payout(key.pub, *info);
cryptonote::block block = {};
uint64_t expected_reward = 0;
blockchain.create_next_pulse_block_template(block, block_producer_payouts, pulse_height, expected_reward);
cryptonote::block_verification_context bvc = {};
core.handle_block_found(block, bvc);
round_state = round_state::wait_next_block;
}
break;
}
pulse::time_point pump_messages_until = {};
switch(round_state)
{
default: break;
case round_state::wait_for_handshakes:
pump_messages_until = context.wait_for_handshakes.end_time;
break;
case round_state::wait_for_other_validator_handshake_bitsets:
pump_messages_until = context.wait_for_other_validator_handshake_bitsets.end_time;
break;
}
if (auto now = pulse::clock::now(); now < pump_messages_until)
{
auto duration = std::chrono::duration_cast<std::chrono::seconds>(pump_messages_until - now);
MGINFO("Pulse: Pumping messages from quorumnet for " << duration.count() << "s or until all messages received.");
}
pulse::message msg = {};
while (cryptonote::quorumnet_pulse_pump_messages(quorumnet_state, msg, pump_messages_until))
{
bool relay_message = false;
bool finish_pumping = false;
if (msg.quorum_position >= static_cast<int>(context.wait_for_round.quorum.validators.size()))
continue;
crypto::public_key const &validator_key = context.wait_for_round.quorum.validators[msg.quorum_position];
if (msg.type == pulse::message_type::handshake)
{
if (round_state != round_state::wait_for_handshakes)
continue;
// TODO(doyle): DRY
// NOTE: Validate Signature
{
auto buf = tools::memcpy_le(top_hash.data, msg.quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
if (!crypto::check_signature(hash, validator_key, msg.signature))
{
MERROR("Signature from pulse handshake bit does not validate with node " << msg.quorum_position << ":" << lokimq::to_hex(tools::view_guts(validator_key)) << ", at height " << pulse_height << "; Validator signing outdated height or bad handshake data");
continue;
}
}
relay_message = ((context.wait_for_handshakes.validator_bits & msg.quorum_position) == 0);
context.wait_for_handshakes.validator_bits |= msg.quorum_position;
finish_pumping = context.wait_for_handshakes.all_received();
}
else if (msg.type == pulse::message_type::handshake_bitset)
{
if (round_state != round_state::wait_for_other_validator_handshake_bitsets)
continue;
// NOTE: Validate Signature
{
auto buf = tools::memcpy_le(msg.validator_bitset, top_hash.data, msg.quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
if (!crypto::check_signature(hash, validator_key, msg.signature))
{
MERROR("Signature from pulse handshake bitset does not validate with node " << msg.quorum_position << ":" << lokimq::to_hex(tools::view_guts(validator_key)) << ", at height " << pulse_height << "; Validator signing outdated height or bad handshake data");
continue;
}
}
uint16_t prev_bitset = context.wait_for_other_validator_handshake_bitsets.received_bitsets[msg.quorum_position];
relay_message = prev_bitset != msg.validator_bitset;
context.wait_for_other_validator_handshake_bitsets.received_bitsets[msg.quorum_position] = msg.validator_bitset;
if (prev_bitset == 0)
context.wait_for_other_validator_handshake_bitsets.received_bitsets_count++;
finish_pumping = context.wait_for_other_validator_handshake_bitsets.all_received();
}
else
{
assert(msg.type == pulse::message_type::invalid);
}
if (relay_message)
cryptonote::quorumnet_pulse_relay_message_to_quorum(quorumnet_state, msg, context.wait_for_round.quorum);
if (finish_pumping)
break;
}
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <condition_variable>
#include "cryptonote_basic/cryptonote_basic_impl.h"
#include "crypto/crypto.h"
namespace cryptonote
{
class core;
class transaction;
struct block;
struct checkpoint_t;
};
namespace service_nodes
{
struct service_node_keys;
};
namespace pulse
{
using clock = std::chrono::system_clock;
using time_point = std::chrono::time_point<clock>;
enum struct message_type
{
invalid,
handshake,
handshake_bitset,
};
struct message
{
message_type type;
int quorum_position;
crypto::signature signature;
uint16_t validator_bitset;
};
struct state : public cryptonote::BlockAddedHook
{
std::condition_variable block_added_cv;
std::atomic<uint64_t> last_miner_block;
bool block_added(const cryptonote::block& block, const std::vector<cryptonote::transaction>& txs, cryptonote::checkpoint_t const *checkpoint) override;
};
void main(pulse::state &state, void *quorumnet_state, cryptonote::core &core);
} // namespace pulse

View File

@ -8,7 +8,7 @@
namespace service_nodes {
constexpr auto PULSE_TIME_PER_BLOCK = std::chrono::minutes(2);
constexpr size_t PULSE_QUORUM_ENTROPY_LAG = 21; // How many blocks back from the tip of the Blockchain to source entropy for the Pulse quorums.
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 0;
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 11;
constexpr size_t PULSE_QUORUM_SIZE = PULSE_QUORUM_NUM_VALIDATORS + 1 /*Leader*/;
constexpr size_t pulse_min_service_nodes(cryptonote::network_type nettype)
@ -26,7 +26,7 @@ namespace service_nodes {
return result;
}
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 0; // A block must have exactly N signatures to be considered properly
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 7; // A block must have exactly N signatures to be considered properly
static_assert(PULSE_QUORUM_NUM_VALIDATORS >= PULSE_BLOCK_REQUIRED_SIGNATURES);
static_assert(PULSE_QUORUM_ENTROPY_LAG >= PULSE_QUORUM_SIZE, "We need to pull atleast PULSE_QUORUM_SIZE number of blocks from the Blockchain, we can't if the amount of blocks to go back from the tip of the Blockchain is less than the blocks we need.");

View File

@ -32,6 +32,7 @@
#include "cryptonote_core/service_node_rules.h"
#include "cryptonote_core/tx_blink.h"
#include "cryptonote_core/tx_pool.h"
#include "cryptonote_core/pulse.h"
#include "quorumnet_conn_matrix.h"
#include "cryptonote_config.h"
#include "common/random.h"
@ -88,6 +89,10 @@ struct QnetState {
// FIXME:
//std::chrono::steady_clock::time_point last_blink_cleanup = std::chrono::steady_clock::now();
std::mutex pulse_message_queue_mutex;
std::condition_variable pulse_message_queue_cv;
std::queue<pulse::message> pulse_message_queue;
QnetState(cryptonote::core &core) : core{core} {}
static QnetState &from(void* obj) {
@ -161,7 +166,7 @@ public:
peer_info(
QnetState& qnet,
quorum_type q_type,
std::shared_ptr<const quorum> &quorum,
const quorum *quorum,
bool opportunistic = true,
exclude_set exclude = {}
)
@ -423,7 +428,7 @@ void relay_obligation_votes(void *obj, const std::vector<service_nodes::quorum_v
continue;
}
peer_info pinfo{qnet, vote.type, quorum};
peer_info pinfo{qnet, vote.type, quorum.get()};
if (!pinfo.my_position_count) {
MWARNING("Invalid vote relay: vote to relay does not include this service node");
continue;
@ -989,6 +994,18 @@ void extract_signature_values(bt_dict_consumer& data, std::string_view key, std:
if (it != signatures.end()) throw std::invalid_argument("Invalid blink signature data: " + std::string{key} + " size < i size");
}
crypto::signature convert_string_view_bytes_to_signature(std::string_view sig_str)
{
if (sig_str.size() != sizeof(crypto::signature))
throw std::invalid_argument("Invalid signature data size: " + std::to_string(sizeof(crypto::signature)));
crypto::signature result;
std::memcpy(&result, sig_str.data(), sizeof(crypto::signature));
if (!result) throw std::invalid_argument("Invalid signature data: null signature given");
return result;
}
/// A "blink_sign" message is used to relay signatures from one quorum member to other members.
/// Fields are:
///
@ -1073,13 +1090,7 @@ void handle_blink_signature(Message& m, QnetState& qnet) {
// s - list of 64-byte signatures
extract_signature_values(data, "s", signatures, [](bt_list_consumer& l) {
auto sig_str = l.consume_string_view();
if (sig_str.size() != sizeof(crypto::signature))
throw std::invalid_argument("Invalid blink signature data: invalid signature");
crypto::signature s;
std::memcpy(&s, sig_str.data(), sizeof(crypto::signature));
if (!s) throw std::invalid_argument("Invalid blink signature data: invalid null signature");
return s;
return convert_string_view_bytes_to_signature(l.consume_string_view());
});
auto blink_quorums = get_blink_quorums(blink_height, qnet.core.get_service_node_list(), &checksum); // throws if bad quorum or checksum mismatch
@ -1364,6 +1375,220 @@ void handle_blink_success(Message& m) {
common_blink_response(tag, cryptonote::blink_result::accepted, ""s);
}
//
// Pulse
//
void send_pulse_validator_handshake_bit_or_bitset(void *self, bool sending_bitset, service_nodes::quorum const &quorum, crypto::hash const &top_hash, uint16_t validator_bitset)
{
QnetState &qnet = *static_cast<QnetState *>(self);
cryptonote::core &core = qnet.core;
// NOTE: Invariants
if (!core.service_node())
throw std::runtime_error("Pulse: Daemon is not a service node.");
// NOTE: Check we are in quorum
std::unordered_map<crypto::public_key, uint16_t> validator_to_quorum_index;
for (size_t i = 0; i < quorum.validators.size(); i++)
validator_to_quorum_index[quorum.validators[i]] = i;
// NOTE: Generate list of nodes to contact
peer_info peer_list{qnet, quorum_type::pulse, &quorum, false /*opportunistic*/};
if (peer_list.my_position.empty() || peer_list.my_position[0])
throw std::runtime_error("Pulse: Could not find this node's public key in quorum");
int const my_quorum_position = peer_list.my_position[0];
std::string data;
auto command = (sending_bitset) ? "pulse.validator_bitset"sv : "pulse.validator_bit"sv;
if (sending_bitset)
{
auto buf = tools::memcpy_le(validator_bitset, top_hash.data, my_quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
crypto::signature signature;
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, signature);
data = bt_serialize<bt_dict>({
{"b", validator_bitset},
{"q", my_quorum_position},
{"s", tools::view_guts(signature)}
});
}
else
{
auto buf = tools::memcpy_le(top_hash.data, my_quorum_position);
crypto::hash hash;
crypto::cn_fast_hash(buf.data(), buf.size(), hash);
crypto::signature signature;
crypto::generate_signature(hash, core.get_service_keys().pub, core.get_service_keys().key, signature);
data = bt_serialize<bt_dict>({
{"q", my_quorum_position},
{"s", tools::view_guts(signature)}
});
}
// NOTE: Dispatch
for (auto const &remote : peer_list.remotes) {
auto const &pubkey = remote.first;
auto const &[x25519_pubkey_crypto, connection_location] = remote.second;
std::string x25519_pubkey = get_data_as_string(x25519_pubkey_crypto);
qnet.core.get_lmq().send(x25519_pubkey, command, data, send_option::hint{connection_location});
}
}
void send_pulse_validator_handshake_bit(void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash)
{
send_pulse_validator_handshake_bit_or_bitset(self, false /*sending_bitset*/, quorum, top_hash, 0);
}
void send_pulse_validator_handshake_bitset(void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash, uint16_t validator_bitset)
{
send_pulse_validator_handshake_bit_or_bitset(self, true /*sending_bitset*/, quorum, top_hash, validator_bitset);
}
void handle_pulse_participation_bit_or_bitset(Message &m, QnetState& qnet, bool bitset)
{
if (m.data.size() != 1)
throw std::runtime_error("Rejecting pulse participation handshake: expected one data entry not " + std::to_string(m.data.size()));
int quorum_position = -1;
uint16_t validator_bitset = 0;
crypto::signature signature = {};
// NOTE: Extract Data
bt_dict_consumer data{m.data[0]};
constexpr auto VALIDATOR_BITSET_TAG = "b"sv;
constexpr auto QUORUM_POSITION_TAG = "q"sv;
constexpr auto SIGNATURE_TAG = "s"sv;
if (bitset)
{
std::string_view INVALID_ARG_PREFIX = bitset ? "Invalid pulse validator bitset: missing required field '"sv
: "Invalid pulse validator bit: missing required field '"sv;
if (data.skip_until(VALIDATOR_BITSET_TAG))
validator_bitset = data.consume_integer<uint16_t>();
else
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(VALIDATOR_BITSET_TAG) + "'");
if (data.skip_until(QUORUM_POSITION_TAG))
quorum_position = data.consume_integer<int>();
else
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(QUORUM_POSITION_TAG) + "'");
if (data.skip_until(SIGNATURE_TAG)) {
auto sig_str = data.consume_string_view();
signature = convert_string_view_bytes_to_signature(sig_str);
} else {
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(SIGNATURE_TAG) + "'");
}
assert(validator_bitset != -1);
assert(quorum_position != -1);
assert(signature);
}
else
{
constexpr auto INVALID_ARG_PREFIX = "Invalid pulse validator bit: missing required field '"sv;
if (data.skip_until(QUORUM_POSITION_TAG))
quorum_position = data.consume_integer<int>();
else
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(QUORUM_POSITION_TAG) + "'");
if (data.skip_until(SIGNATURE_TAG)) {
auto sig_str = data.consume_string_view();
signature = convert_string_view_bytes_to_signature(sig_str);
} else {
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(SIGNATURE_TAG) + "'");
}
assert(quorum_position != -1);
assert(signature);
}
pulse::message msg = {};
msg.signature = signature;
msg.quorum_position = quorum_position;
if (bitset)
{
msg.type = pulse::message_type::handshake_bitset;
msg.validator_bitset = validator_bitset;
}
else
{
msg.type = pulse::message_type::handshake;
}
std::unique_lock lock{qnet.pulse_message_queue_mutex};
qnet.pulse_message_queue.push(std::move(msg));
}
void pulse_relay_message_to_quorum(void *self, pulse::message const &msg, service_nodes::quorum const &quorum)
{
using message_type = pulse::message_type;
std::string data;
switch(msg.type)
{
case message_type::invalid:
{
assert(!"Invalid Code Path");
return;
}
case message_type::handshake:
{
data = bt_serialize<bt_dict>({
{"q", msg.quorum_position},
{"s", tools::view_guts(msg.signature)}
});
}
break;
case message_type::handshake_bitset:
{
data = bt_serialize<bt_dict>({
{"b", msg.validator_bitset},
{"q", msg.quorum_position},
{"s", tools::view_guts(msg.signature)}
});
}
break;
}
peer_info::exclude_set relay_exclude;
relay_exclude.insert(quorum.validators[msg.quorum_position]);
QnetState &qnet = *static_cast<QnetState *>(self);
peer_info peer_list{qnet, quorum_type::pulse, &quorum, true /*opportunistic*/, std::move(relay_exclude)};
char const *command = msg.type == message_type::handshake_bitset ? "pulse.validator_bitset" : "pulse.validator_bit";
peer_list.relay_to_peers(command, data);
}
bool pulse_message_queue_pump_messages(void *self, pulse::message &msg, pulse::time_point sleep_until)
{
auto qnet = static_cast<QnetState *>(self);
std::unique_lock lock{qnet->pulse_message_queue_mutex};
bool has_message = false;
qnet->pulse_message_queue_cv.wait_until(lock, sleep_until, [qnet, sleep_until, &has_message]() {
has_message = qnet->pulse_message_queue.size();
return has_message || pulse::clock::now() >= sleep_until;
});
if (has_message)
{
lock.lock();
msg = std::move(qnet->pulse_message_queue.front());
qnet->pulse_message_queue.pop();
}
return has_message;
}
} // end empty namespace
@ -1374,6 +1599,13 @@ void init_core_callbacks() {
cryptonote::quorumnet_delete = delete_qnetstate;
cryptonote::quorumnet_relay_obligation_votes = relay_obligation_votes;
cryptonote::quorumnet_send_blink = send_blink;
cryptonote::quorumnet_send_pulse_validator_handshake_bit = send_pulse_validator_handshake_bit;
cryptonote::quorumnet_send_pulse_validator_handshake_bitset = send_pulse_validator_handshake_bitset;
cryptonote::quorumnet_pulse_pump_messages = pulse_message_queue_pump_messages;
cryptonote::quorumnet_pulse_relay_message_to_quorum = pulse_relay_message_to_quorum;
}
namespace {
@ -1413,6 +1645,11 @@ void setup_endpoints(QnetState& qnet) {
.add_command("good", handle_blink_success)
;
lmq.add_category("pulse", Access{AuthLevel::none, true /*remote sn*/, true /*local sn*/}, 1 /*reserved thread*/)
.add_request_command("validator_bit", [&qnet](Message& m) { handle_pulse_participation_bit_or_bitset(m, qnet, false /*bitset*/); })
.add_request_command("validator_bitset", [&qnet](Message& m) { handle_pulse_participation_bit_or_bitset(m, qnet, true /*bitset*/); })
;
// Compatibility aliases. No longer used since 7.1.4, but can still be received from previous
// 7.1.x nodes.
// Transition plan: