mirror of
https://github.com/oxen-io/oxen-storage-server.git
synced 2023-12-13 21:00:26 +01:00
Optimize random message retrieval
The approach being used here with an offset is painfully inefficient, and has a race condition; this switches it to something better. This also allows elimination of the ridiculous "util::uniform_distribution_portable" call which didn't produce anything portable at all.
This commit is contained in:
parent
b89872c478
commit
6b70b3fe71
|
@ -1100,34 +1100,6 @@ std::pair<MessageTestStatus, std::string> ServiceNode::process_storage_test_req(
|
|||
return {MessageTestStatus::SUCCESS, std::move(item.data)};
|
||||
}
|
||||
|
||||
std::optional<Item> ServiceNode::select_random_message() {
|
||||
|
||||
uint64_t message_count;
|
||||
if (!db_->get_message_count(message_count)) {
|
||||
OXEN_LOG(err, "Could not count messages in the database");
|
||||
return {};
|
||||
}
|
||||
|
||||
OXEN_LOG(debug, "total messages: {}", message_count);
|
||||
|
||||
if (message_count == 0) {
|
||||
OXEN_LOG(debug, "No messages in the database to initiate a peer test");
|
||||
return {};
|
||||
}
|
||||
|
||||
// SNodes don't have to agree on this, rather they should use different
|
||||
// messages
|
||||
const auto msg_idx = util::uniform_distribution_portable(message_count);
|
||||
|
||||
auto item = std::make_optional<Item>();
|
||||
if (!db_->retrieve_by_index(msg_idx, *item)) {
|
||||
OXEN_LOG(err, "Could not retrieve message by index: {}", msg_idx);
|
||||
return {};
|
||||
}
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
void ServiceNode::initiate_peer_test() {
|
||||
|
||||
std::lock_guard guard(sn_mutex_);
|
||||
|
@ -1162,9 +1134,9 @@ void ServiceNode::initiate_peer_test() {
|
|||
}
|
||||
|
||||
/// 2. Storage Testing: initiate a testing request with a randomly selected message
|
||||
if (auto item = select_random_message()) {
|
||||
OXEN_LOG(trace, "Selected random message: {}, {}", item->hash, item->data);
|
||||
send_storage_test_req(testee, test_height, *item);
|
||||
if (Item item; db_->retrieve_random(item)) {
|
||||
OXEN_LOG(trace, "Selected random message: {}, {}", item.hash, item.data);
|
||||
send_storage_test_req(testee, test_height, item);
|
||||
} else {
|
||||
OXEN_LOG(debug, "Could not select a message for testing");
|
||||
}
|
||||
|
|
|
@ -149,9 +149,6 @@ class ServiceNode {
|
|||
/// Check if it is our turn to test and initiate peer test if so
|
||||
void initiate_peer_test();
|
||||
|
||||
/// Select a random message from our database, return nullopt on error
|
||||
std::optional<storage::Item> select_random_message(); // mutex not needed
|
||||
|
||||
// Initiate node ping tests
|
||||
void test_reachability(const sn_record_t& sn, int previous_failures);
|
||||
|
||||
|
|
|
@ -48,9 +48,8 @@ class Database {
|
|||
// Return the total number of messages stored
|
||||
bool get_message_count(uint64_t& count);
|
||||
|
||||
// Get message by `index` (must be smaller than the result of
|
||||
// `get_message_count`).
|
||||
bool retrieve_by_index(uint64_t index, storage::Item& item);
|
||||
// Get random message. Returns false if there are no messages (or the db query failed)
|
||||
bool retrieve_random(storage::Item& item);
|
||||
|
||||
// Get message by `msg_hash`, return true if found
|
||||
bool retrieve_by_hash(const std::string& msg_hash, storage::Item& item);
|
||||
|
@ -69,7 +68,7 @@ class Database {
|
|||
sqlite3_stmt* get_all_stmt;
|
||||
sqlite3_stmt* get_stmt;
|
||||
sqlite3_stmt* get_row_count_stmt;
|
||||
sqlite3_stmt* get_by_index_stmt;
|
||||
sqlite3_stmt* get_random_stmt;
|
||||
sqlite3_stmt* get_by_hash_stmt;
|
||||
sqlite3_stmt* delete_expired_stmt;
|
||||
sqlite3_stmt* page_count_stmt;
|
||||
|
|
|
@ -16,7 +16,7 @@ Database::~Database() {
|
|||
sqlite3_finalize(get_all_stmt);
|
||||
sqlite3_finalize(get_stmt);
|
||||
sqlite3_finalize(get_row_count_stmt);
|
||||
sqlite3_finalize(get_by_index_stmt);
|
||||
sqlite3_finalize(get_random_stmt);
|
||||
sqlite3_finalize(get_by_hash_stmt);
|
||||
sqlite3_finalize(delete_expired_stmt);
|
||||
sqlite3_finalize(page_count_stmt);
|
||||
|
@ -147,13 +147,14 @@ void Database::open_and_prepare(const std::filesystem::path& db_path) {
|
|||
throw std::runtime_error{err};
|
||||
}
|
||||
|
||||
// Don't fail on these because we can still work even if they fail
|
||||
if (int rc = sqlite3_exec(db, "PRAGMA journal_mode = WAL", nullptr, nullptr, nullptr);
|
||||
rc != SQLITE_OK)
|
||||
OXEN_LOG(critical, "Failed to set journal mode to WAL: {}", sqlite3_errstr(rc));
|
||||
OXEN_LOG(err, "Failed to set journal mode to WAL: {}", sqlite3_errstr(rc));
|
||||
|
||||
if (int rc = sqlite3_exec(db, "PRAGMA synchronous = NORMAL", nullptr, nullptr, nullptr);
|
||||
rc != SQLITE_OK)
|
||||
OXEN_LOG(critical, "Failed to set synchronous mode to NORMAL: {}", sqlite3_errstr(rc));
|
||||
OXEN_LOG(err, "Failed to set synchronous mode to NORMAL: {}", sqlite3_errstr(rc));
|
||||
|
||||
check_page_size(db);
|
||||
set_page_count(db);
|
||||
|
@ -216,9 +217,9 @@ void Database::open_and_prepare(const std::filesystem::path& db_path) {
|
|||
if (!get_row_count_stmt)
|
||||
throw std::runtime_error("could not prepare row count statement");
|
||||
|
||||
get_by_index_stmt = prepare_statement("SELECT * FROM `Data` LIMIT ?, 1;");
|
||||
if (!get_by_index_stmt)
|
||||
throw std::runtime_error("could not prepare get by index statement");
|
||||
get_random_stmt = prepare_statement("SELECT * FROM Data WHERE rowid = (SELECT rowid FROM Data ORDER BY RANDOM() LIMIT 1)");
|
||||
if (!get_random_stmt)
|
||||
throw std::runtime_error("could not prepare get random statement");
|
||||
|
||||
get_by_hash_stmt =
|
||||
prepare_statement("SELECT * FROM `Data` WHERE `Hash` = ?;");
|
||||
|
@ -312,32 +313,28 @@ static Item extract_item(sqlite3_stmt* stmt) {
|
|||
return item;
|
||||
}
|
||||
|
||||
bool Database::retrieve_by_index(uint64_t index, Item& item) {
|
||||
|
||||
sqlite3_bind_int64(get_by_index_stmt, 1, index);
|
||||
bool Database::retrieve_random(Item& item) {
|
||||
|
||||
bool success = false;
|
||||
int rc;
|
||||
while (true) {
|
||||
rc = sqlite3_step(get_by_index_stmt);
|
||||
rc = sqlite3_step(get_random_stmt);
|
||||
if (rc == SQLITE_BUSY) {
|
||||
continue;
|
||||
} else if (rc == SQLITE_DONE) {
|
||||
// Note that if the index is out of bounds, we will get here
|
||||
// returning an empty Item
|
||||
break;
|
||||
} else if (rc == SQLITE_ROW) {
|
||||
item = extract_item(get_by_index_stmt);
|
||||
item = extract_item(get_random_stmt);
|
||||
success = true;
|
||||
break;
|
||||
} else {
|
||||
OXEN_LOG(critical,
|
||||
"Could not execute `retrieve by index` db statement");
|
||||
"Could not execute `retrieve random` db statement");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
rc = sqlite3_reset(get_by_index_stmt);
|
||||
rc = sqlite3_reset(get_random_stmt);
|
||||
if (rc != SQLITE_OK) {
|
||||
OXEN_LOG(critical, "sqlite reset error: [{}], {}", rc,
|
||||
sqlite3_errmsg(db));
|
||||
|
|
|
@ -24,9 +24,6 @@ uint64_t get_time_ms();
|
|||
/// Returns a reference to a randomly seeded, thread-local RNG.
|
||||
std::mt19937_64& rng();
|
||||
|
||||
/// Returns a random number from [0, n) using `rng()`
|
||||
uint64_t uniform_distribution_portable(uint64_t n);
|
||||
|
||||
/// Returns a random number from [0, n); (copied from lokid)
|
||||
uint64_t uniform_distribution_portable(std::mt19937_64& mersenne_twister,
|
||||
uint64_t n);
|
||||
|
|
|
@ -73,10 +73,6 @@ std::mt19937_64& rng() {
|
|||
return generator;
|
||||
}
|
||||
|
||||
uint64_t uniform_distribution_portable(uint64_t n) {
|
||||
return uniform_distribution_portable(rng(), n);
|
||||
}
|
||||
|
||||
uint64_t uniform_distribution_portable(std::mt19937_64& mersenne_twister,
|
||||
uint64_t n) {
|
||||
const uint64_t secure_max =
|
||||
|
|
Loading…
Reference in a new issue