Pulse: Convert to event based pulse loop

- Respond to pulse events by submitting to the LokiMQ thread job queue
instead of managing our own queue.
This commit is contained in:
Doyle 2020-08-10 19:37:17 +10:00
parent 738231eb93
commit 55086cad8f
6 changed files with 383 additions and 477 deletions

View file

@ -280,8 +280,6 @@ namespace cryptonote
quorumnet_send_pulse_validator_handshake_bitset_proc *quorumnet_send_pulse_validator_handshake_bitset = [](void *, service_nodes::quorum const &, crypto::hash const &, uint16_t) -> void { need_core_init(); };
quorumnet_send_pulse_block_template_proc *quorumnet_send_pulse_block_template = [](void *, std::string &&blob, crypto::signature const &, service_nodes::quorum const &) -> void { need_core_init(); };
quorumnet_pulse_pump_messages_proc *quorumnet_pulse_pump_messages = [](void*, pulse::message &, pulse::time_point) -> bool { need_core_init(); return false; };
quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum = [](void *, pulse::message const &, service_nodes::quorum const &, bool) -> void { need_core_init(); };
//-----------------------------------------------------------------------------------------------
@ -1085,16 +1083,17 @@ namespace cryptonote
void core::start_lokimq() {
update_lmq_sns(); // Ensure we have SNs set for the current block before starting
if (m_service_node)
m_pulse_thread_id = m_lmq->add_tagged_thread("pulse");
m_lmq->start();
if (m_service_node)
{
m_blockchain_storage.hook_block_added(m_pulse_state);
lokimq::TaggedThreadID pulse_thread_id = m_lmq->add_tagged_thread("pulse");
m_lmq->start();
m_lmq->job([this]() { pulse::main(m_pulse_state, m_quorumnet_state, *this); }, pulse_thread_id);
}
else
{
m_lmq->start();
m_lmq->add_timer([this]() { pulse::main(m_quorumnet_state, *this); },
std::chrono::milliseconds(500),
false,
m_pulse_thread_id);
}
}

View file

@ -104,11 +104,6 @@ namespace cryptonote
using quorumnet_send_pulse_validator_handshake_bit_proc = void (void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash);
using quorumnet_send_pulse_validator_handshake_bitset_proc = void (void *self, service_nodes::quorum const &quorum, crypto::hash const &top_hash, uint16_t handshake_bitset);
using quorumnet_send_pulse_block_template_proc = void (void *, std::string &&blob, crypto::signature const &, service_nodes::quorum const &);
// Blocking call that waits on a message from the Pulse message queue which are messages for Pulse received via QuorumNet until the end time has passed.
// end_time: Set a clock time to which the call can return if there are no more messages queued up.
// msg: When function returns true, the pumped message is written to msg.
// return: False if there is no message and the clock has passed the end_time, true if there's a message.
using quorumnet_pulse_pump_messages_proc = bool (void *self, pulse::message &msg, pulse::time_point end_time);
// Relay a Pulse message to members specified in the quorum excluding the originating message owner.
using quorumnet_pulse_relay_message_to_quorum_proc = void (void *, pulse::message const &msg, service_nodes::quorum const &quorum, bool block_producer);
@ -120,7 +115,6 @@ namespace cryptonote
extern quorumnet_send_pulse_validator_handshake_bit_proc *quorumnet_send_pulse_validator_handshake_bit;
extern quorumnet_send_pulse_validator_handshake_bitset_proc *quorumnet_send_pulse_validator_handshake_bitset;
extern quorumnet_send_pulse_block_template_proc *quorumnet_send_pulse_block_template;
extern quorumnet_pulse_pump_messages_proc *quorumnet_pulse_pump_messages;
extern quorumnet_pulse_relay_message_to_quorum_proc *quorumnet_pulse_relay_message_to_quorum;
/************************************************************************/
@ -1159,6 +1153,7 @@ namespace cryptonote
* internal use only.
*/
std::unordered_map<crypto::x25519_public_key, lokimq::AuthLevel>& _lmq_auth_level_map() { return m_lmq_auth; }
lokimq::TaggedThreadID const &pulse_thread_id() const { return *m_pulse_thread_id; }
private:
@ -1254,7 +1249,7 @@ namespace cryptonote
uint64_t height = 0, emissions = 0, fees = 0, burnt = 0;
} m_coinbase_cache;
pulse::state m_pulse_state{};
std::optional<lokimq::TaggedThreadID> m_pulse_thread_id;
};
}

File diff suppressed because it is too large Load diff

View file

@ -64,13 +64,7 @@ struct message
} block_template;
};
struct state : public cryptonote::BlockAddedHook
{
std::condition_variable wakeup_cv;
std::atomic<bool> shutdown;
bool block_added(const cryptonote::block& block, const std::vector<cryptonote::transaction>& txs, cryptonote::checkpoint_t const *checkpoint) override;
};
void main(pulse::state &state, void *quorumnet_state, cryptonote::core &core);
void main(void *quorumnet_state, cryptonote::core &core);
void handle_message(void *quorumnet_state, pulse::message const &msg);
} // namespace pulse

View file

@ -9,17 +9,17 @@ namespace service_nodes {
constexpr size_t PULSE_QUORUM_ENTROPY_LAG = 21; // How many blocks back from the tip of the Blockchain to source entropy for the Pulse quorums.
#if defined(LOKI_ENABLE_INTEGRATION_TEST_HOOKS)
constexpr auto PULSE_ROUND_TIME = 20s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 5s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 5s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 5s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 7s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 7s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 6s;
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 0;
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 0;
#else
constexpr auto PULSE_ROUND_TIME = 60s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 15s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 15s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 15s;
constexpr auto PULSE_WAIT_FOR_HANDSHAKES_DURATION = 20s;
constexpr auto PULSE_WAIT_FOR_OTHER_VALIDATOR_HANDSHAKES_DURATION = 20s;
constexpr auto PULSE_WAIT_FOR_BLOCK_TEMPLATE_DURATION = 20s;
constexpr size_t PULSE_QUORUM_NUM_VALIDATORS = 7;
constexpr size_t PULSE_BLOCK_REQUIRED_SIGNATURES = 7; // A block must have exactly N signatures to be considered properly

View file

@ -1541,13 +1541,11 @@ void send_pulse_validator_handshake_bit_or_bitset(void *self, bool sending_bitse
throw std::runtime_error("Pulse: Daemon is not a service node.");
pulse::message msg = {};
if (auto it = std::find_if(quorum.validators.begin(), quorum.validators.end(),
[&mypk=core.get_service_keys().pub] (auto const &pk) { return pk == mypk; }); it == quorum.validators.end())
if (auto it = std::find(quorum.validators.begin(), quorum.validators.end(), core.get_service_keys().pub); it == quorum.validators.end())
throw std::runtime_error("Pulse: Could not find this node's public key in quorum");
else
msg.handshakes.quorum_position = it - quorum.validators.begin();
auto command = (sending_bitset) ? PULSE_CMD_SEND_VALIDATOR_BITSET : PULSE_CMD_SEND_VALIDATOR_BIT;
if (sending_bitset)
{
auto buf = tools::memcpy_le(validator_bitset, top_hash.data, msg.handshakes.quorum_position);
@ -1659,11 +1657,8 @@ void handle_pulse_participation_bit_or_bitset(Message &m, QnetState& qnet, bool
msg.type = pulse::message_type::handshake;
}
{
std::unique_lock lock{qnet.pulse_message_queue_mutex};
qnet.pulse_message_queue.push(std::move(msg));
}
qnet.pulse_message_queue_cv.notify_one();
auto *self = reinterpret_cast<void *>(&qnet);
qnet.lmq.job([self, data = std::move(msg)]() { pulse::handle_message(self, data); }, qnet.core.pulse_thread_id());
}
void handle_pulse_block_template(Message &m, QnetState &qnet)
@ -1690,34 +1685,8 @@ void handle_pulse_block_template(Message &m, QnetState &qnet)
throw std::invalid_argument(std::string(INVALID_ARG_PREFIX) + std::string(PULSE_TAG_QUORUM_POSITION) + "'");
}
{
std::unique_lock lock{qnet.pulse_message_queue_mutex};
qnet.pulse_message_queue.push(std::move(msg));
}
qnet.pulse_message_queue_cv.notify_one();
}
bool pulse_message_queue_pump_messages(void *self, pulse::message &msg, pulse::time_point sleep_until)
{
auto qnet = static_cast<QnetState *>(self);
std::unique_lock lock{qnet->pulse_message_queue_mutex};
bool has_message = qnet->pulse_message_queue.size();
if (!has_message)
{
qnet->pulse_message_queue_cv.wait_until(lock, sleep_until, [qnet, sleep_until, &has_message]() {
has_message = qnet->pulse_message_queue.size();
return has_message || pulse::clock::now() >= sleep_until;
});
}
if (has_message)
{
msg = std::move(qnet->pulse_message_queue.front());
qnet->pulse_message_queue.pop();
}
return has_message;
auto *self = reinterpret_cast<void *>(&qnet);
qnet.lmq.job([self, data = std::move(msg)]() { pulse::handle_message(self, data); }, qnet.core.pulse_thread_id());
}
} // end empty namespace
@ -1735,8 +1704,6 @@ void init_core_callbacks() {
cryptonote::quorumnet_send_pulse_validator_handshake_bitset = send_pulse_validator_handshake_bitset;
cryptonote::quorumnet_send_pulse_block_template = send_pulse_block_template;
cryptonote::quorumnet_pulse_pump_messages = pulse_message_queue_pump_messages;
cryptonote::quorumnet_pulse_relay_message_to_quorum = pulse_relay_message_to_quorum;
}