Database redesign, refactor & migration

Redesigns the database to be a more appropriate, less duplicative design
using "owners" and "messages" with a foreign key between them.

Rewrites all the database code using SQLiteCpp which substantially
reduces the amount of boilerplate, duplicate code for query handling.

Makes the statement handlers thread_local for better thread safety; this
also allows the actual query to be where it is executed, rather than
having all the prepared queries in one place nowhere close to where they
are actually used.
This commit is contained in:
Jason Rhinelander 2021-06-16 18:50:12 -03:00
parent b7b0d75d4f
commit 314894d4b1
5 changed files with 673 additions and 560 deletions

View file

@ -557,9 +557,7 @@ void RequestHandler::process_client_req(
? res->result["swarm"][service_node_.own_address().pubkey_ed25519.hex()]
: res->result;
std::vector<std::string_view> msgs{req.messages.begin(), req.messages.end()};
if (auto deleted = service_node_.delete_messages(req.pubkey, msgs)) {
if (auto deleted = service_node_.delete_messages(req.pubkey, req.messages)) {
std::sort(deleted->begin(), deleted->end());
auto sig = create_signature(ed25519_sk_, req.pubkey.prefixed_hex(), req.messages, *deleted);
mine["deleted"] = std::move(*deleted);
@ -678,9 +676,7 @@ void RequestHandler::process_client_req(
? res->result["swarm"][service_node_.own_address().pubkey_ed25519.hex()]
: res->result;
std::vector<std::string_view> msgs{req.messages.begin(), req.messages.end()};
if (auto updated = service_node_.update_messages_expiry(req.pubkey, msgs, req.expiry)) {
if (auto updated = service_node_.update_messages_expiry(req.pubkey, req.messages, req.expiry)) {
std::sort(updated->begin(), updated->end());
auto sig = create_signature(ed25519_sk_, req.pubkey.prefixed_hex(), req.expiry, req.messages, *updated);
mine["updated"] = std::move(*updated);
@ -755,16 +751,16 @@ void RequestHandler::process_client_req(
Response RequestHandler::process_retrieve_all() {
std::vector<storage::Item> all_entries;
bool res = service_node_.get_all_messages(all_entries);
if (!res)
return {http::INTERNAL_SERVER_ERROR, "could not retrieve all entries"s};
std::vector<message_t> msgs;
try {
msgs = service_node_.get_all_messages();
} catch (const std::exception& e) {
return {http::INTERNAL_SERVER_ERROR, "could not retrieve all messages"s};
json messages = json::array();
for (auto& entry : all_entries)
messages.push_back({ {"data",}, {"pk", entry.pub_key} });
for (auto& m : msgs)
messages.push_back(json{{"data", std::move(}, {"pk", m.pubkey.prefixed_hex()}});
return {http::OK, json{{"messages", std::move(messages)}}};

View file

@ -338,7 +338,7 @@ void ServiceNode::record_proxy_request() { all_stats_.bump_proxy_requests(); }
void ServiceNode::record_onion_request() { all_stats_.bump_onion_requests(); }
bool ServiceNode::process_store(message_t msg) {
bool ServiceNode::process_store(message_t msg, bool* new_msg) {
std::lock_guard guard{sn_mutex_};
@ -352,8 +352,11 @@ bool ServiceNode::process_store(message_t msg) {
/// store in the database (if not already present)
if (db_->store(msg))
OXEN_LOG(trace, "saved message: {}",;
auto stored = db_->store(msg);
if (stored)
OXEN_LOG(trace, *stored ? "saved message: {}" : "message already exists: {}",;
if (new_msg)
*new_msg = stored.value_or(false);
// TODO: don't need to relay this anymore after 2.2.0 because the store itself becomes
// recursive.
@ -1278,12 +1281,9 @@ std::string ServiceNode::get_stats() const {
val["height"] = block_height_;
val["target_height"] = target_height_;
if (uint64_t total_stored; db_->get_message_count(total_stored))
val["total_stored"] = total_stored;
if (uint64_t db_pages; db_->get_used_pages(db_pages)) {
val["db_used"] = db_pages * Database::PAGE_SIZE;
val["db_max"] = Database::SIZE_LIMIT;
val["total_stored"] = db_->get_message_count();
val["db_used"] = db_->get_used_bytes();
val["db_max"] = Database::SIZE_LIMIT;
return val.dump();
@ -1312,23 +1312,21 @@ std::string ServiceNode::get_status_line() const {
s << swarm.substr(0, 4) << u8"" << swarm.substr(swarm.size()-3);
s << "(n=" << (1 + swarm_->other_nodes().size()) << ")";
if (uint64_t msgs_stored; db_->get_message_count(msgs_stored)) {
s << "; " << msgs_stored << " msgs";
if (uint64_t bytes_stored; db_->get_used_pages(bytes_stored)) {
bytes_stored *= Database::PAGE_SIZE;
s << " (";
auto oldprec = s.precision(3);
if (bytes_stored >= 999'500'000)
s << bytes_stored * 1e-9 << 'G';
else if (bytes_stored >= 999'500)
s << bytes_stored * 1e-6 << 'M';
else if (bytes_stored >= 1000)
s << bytes_stored * 1e-3 << 'k';
s << bytes_stored;
s << "B)";
s << "; " << db_->get_message_count() << " msgs";
if (auto bytes_stored = db_->get_used_bytes(); bytes_stored > 0) {
s << " (";
auto oldprec = s.precision(3);
if (bytes_stored >= 999'500'000)
s << bytes_stored * 1e-9 << 'G';
else if (bytes_stored >= 999'500)
s << bytes_stored * 1e-6 << 'M';
else if (bytes_stored >= 1000)
s << bytes_stored * 1e-3 << 'k';
s << bytes_stored;
s << "B)";
auto [window, stats] = all_stats_.get_recent_requests();

View file

@ -231,7 +231,7 @@ class ServiceNode {
/// on query failure.
std::optional<std::vector<std::string>> delete_messages(
const user_pubkey_t& pubkey,
const std::vector<std::string_view>& msg_hashes);
const std::vector<std::string>& msg_hashes);
/// Deletes all messages owned by the given pubkey with a timestamp <= `timestamp`. Returns the
/// hashes of any deleted messages (including the case where no messages are deleted), nullopt
@ -246,7 +246,7 @@ class ServiceNode {
const user_pubkey_t& pubkey,
const std::vector<std::string_view>& msg_hashes,
const std::vector<std::string>& msg_hashes,
std::chrono::system_clock::time_point new_exp);
/// Shortens the expiry time of all messages owned by the given pubkey. Expiries can only be

View file

@ -1,136 +1,113 @@
#pragma once
#include "Item.hpp"
#include "oxen_common.h"
#include <atomic>
#include <chrono>
#include <cstdint>
#include <filesystem>
#include <iostream>
#include <memory>
#include <optional>
#include <string>
#include <vector>
struct sqlite3;
struct sqlite3_stmt;
namespace oxen {
class DatabaseImpl;
// Storage database class.
class Database {
std::unique_ptr<DatabaseImpl> impl;
friend class DatabaseImpl;
// Recommended period for calling clean_expired()
inline static constexpr auto CLEANUP_PERIOD = 10s;
inline static constexpr int64_t PAGE_SIZE = 4096;
inline static constexpr int64_t SIZE_LIMIT = int64_t(3584) * 1024 * 1024; // 3.5 GB
inline static constexpr int64_t PAGE_LIMIT = SIZE_LIMIT / PAGE_SIZE;
// Constructor. Note that you *must* also set up a timer that runs periodically (every
// CLEANUP_PERIOD is recommended) and calls clean_expired().
explicit Database(const std::filesystem::path& db_path);
enum class DuplicateHandling { IGNORE, FAIL };
bool store(std::string_view hash, std::string_view pubKey, std::string_view bytes,
std::chrono::system_clock::time_point timestamp, std::chrono::system_clock::time_point expiry,
DuplicateHandling behaviour = DuplicateHandling::FAIL);
// if the database is full then print an error only once ever N errors
inline static constexpr int DB_FULL_FREQUENCY = 100;
bool store(const storage::Item& item, DuplicateHandling behaviour = DuplicateHandling::FAIL) {
return store(item.hash, item.pub_key,, item.timestamp, item.expiration, behaviour);
bool store(const message_t& msg, DuplicateHandling behaviour = DuplicateHandling::FAIL) {
return store(msg.hash, msg.pub_key,, msg.timestamp, msg.expiry, behaviour);
// Attempts to store a message in the database. Returns true if inserted, false on failure due
// to the message already existing, and nullopt if the insertion failed because the database
// is full. For other query failures, throws.
// This means `if (` will be true if inserted *or* already present; to check only
// for insertion use `ins && *ins`.
std::optional<bool> store(const message_t& msg);
bool bulk_store(const std::vector<storage::Item>& items);
void bulk_store(const std::vector<message_t>& items);
bool retrieve(const std::string& key, std::vector<storage::Item>& items,
const std::string& lastHash, int num_results = -1);
// Retrieves messages owned by pubkey received since `last_hash` (which must also be owned by
// pubkey). If last_hash is empty or not found then returns all messages (up to the limit).
// Optionally takes a maximum number of messages to return.
// Note that the `pubkey` value of the returned message_t's will be left default constructed,
// i.e. *not* filled with the given pubkey.
std::vector<message_t> retrieve(
const user_pubkey_t& pubkey,
const std::string& last_hash,
std::optional<int> num_results = std::nullopt);
// Returns the number of used database pages
bool get_used_pages(uint64_t& count);
// Retrieves all messages.
std::vector<message_t> retrieve_all();
// Return the total number of messages stored
bool get_message_count(uint64_t& count);
int64_t get_message_count();
// Get random message. Returns false if there are no messages (or the db query failed)
bool retrieve_random(storage::Item& item);
// Returns the number of distinct owner pubkeys with stored messages
int64_t get_owner_count();
// Get message by `msg_hash`, return true if found
bool retrieve_by_hash(std::string_view msg_hash, storage::Item& item);
// Returns the number of used bytes (i.e. used pages * page size) of the database
int64_t get_used_bytes();
// Removes expired messages from the database; the Database owner should call this periodically.
// Get random message. Returns nullopt if there are no messages.
std::optional<message_t> retrieve_random();
// Get message by `msg_hash`, return true if found. Note that this does *not* filter by pubkey!
std::optional<message_t> retrieve_by_hash(const std::string& msg_hash);
// Removes expired messages from the database; the `Database` instance owner should call this
// periodically.
void clean_expired();
// Deletes all messages owned by the given pubkey. Returns the hashes of any deleted messages
// on success (including the case where no messages are deleted), nullopt on query failure.
std::optional<std::vector<std::string>> delete_all(std::string_view pubkey);
std::vector<std::string> delete_all(const user_pubkey_t& pubkey);
// Delete a message owned by the given pubkey having the given hashes. Returns the hashes of
// any delete messages on success (including the case where no messages are deleted), nullopt on
// query failure.
std::optional<std::vector<std::string>> delete_by_hash(
std::string_view pubkey, const std::vector<std::string_view>& msg_hashes);
std::vector<std::string> delete_by_hash(
const user_pubkey_t& pubkey, const std::vector<std::string>& msg_hashes);
// Deletes all messages owned by the given pubkey with a timestamp <= timestamp. Returns the
// hashes of any deleted messages (including the case where no messages are deleted), nullopt on
// query failure.
std::optional<std::vector<std::string>> delete_by_timestamp(
std::string_view pubkey, std::chrono::system_clock::time_point timestamp);
std::vector<std::string> delete_by_timestamp(
const user_pubkey_t& pubkey, std::chrono::system_clock::time_point timestamp);
// Shortens the expiry time of the given messages owned by the given pubkey. Expiries can only
// be shortened (i.e. brought closer to now), not extended into the future. Returns a vector of
// hashes of messages that had their expiries updates. (Missing messages and messages that
// already had an expiry <= the given expiry value are not returned).
std::string_view pubkey,
const std::vector<std::string_view>& msg_hashes,
std::vector<std::string> update_expiry(
const user_pubkey_t& pubkey,
const std::vector<std::string>& msg_hashes,
std::chrono::system_clock::time_point new_exp
// Shortens the expiry time of all messages owned by the given pubkey. Expiries can only be
// shortened (i.e. brought closer to now), not extended into the future. Returns a vector of
// hashes of messages that had their expiries shorten.
std::string_view pubkey, std::chrono::system_clock::time_point new_exp);
struct sqlite_destructor {
void operator()(sqlite3_stmt* ptr);
void operator()(sqlite3* ptr);
using StatementPtr = std::unique_ptr<sqlite3_stmt, sqlite_destructor>;
using SqlitePtr = std::unique_ptr<sqlite3, sqlite_destructor>;
StatementPtr prepare_statement(std::string_view desc, std::string_view query);
void open_and_prepare(const std::filesystem::path& db_path);
// keep track of db full errorss so we don't print them on every store
std::atomic<int> db_full_counter = 0;
SqlitePtr db;
StatementPtr save_stmt;
StatementPtr save_or_ignore_stmt;
StatementPtr get_all_for_pk_stmt;
StatementPtr get_all_stmt;
StatementPtr get_stmt;
StatementPtr get_row_count_stmt;
StatementPtr get_random_stmt;
StatementPtr get_by_hash_stmt;
StatementPtr delete_expired_stmt;
StatementPtr delete_by_timestamp_stmt;
StatementPtr delete_all_stmt;
StatementPtr update_all_expiries_stmt;
StatementPtr page_count_stmt;
std::vector<std::string> update_all_expiries(
const user_pubkey_t& pubkey, std::chrono::system_clock::time_point new_exp);
} // namespace oxen

File diff suppressed because it is too large Load diff