Refactor how TTL & timestamps are stored

Currently we have an awkward storage of timestamp + ttl + expiry in the
database, and timestamp + ttl passed in from the client (as strings!).
Althis is awkward because we want to be able to shorten the expiry, but
that would mess up TTL.  Additionally everything is stored as
`uint64_t`s, which is messy and not type safe.

This commit makes these changes:

- ttl and timestamp can now be sent by the client as integers (in
addition to the current string value).  NB: this is not reliable until
the entire SN network is on the next SS release.

- ttl, timestamp, and expiry are now type-safe std::chrono types instead
of raw integers (milliseconds for ttl and system_clock::time_points for
the other two).

- ttl is no longer stored: instead we just store timestamp + expiry.
(This will let us update expiry later without worrying about TTL).

- ttl/timestamp value validation is moved out of `common/oxen_common.h`
(which was a very odd place for it) and into request_handler.

- serialization no longer supports message_t; rather message_t is *only*
for holding the value the client gives us.  SS now uses storage::Item
everywhere other than incoming client data.

- Removed TTL/Nonce storage and retrieval

- Fully specify the queries columns instead of using `SELECT *`

- Don't use "`" for identifier quoting inside sqlite.  It's non-standard
MySQL garbage that sqlite3 supported only for MySQL compatibility.
This commit is contained in:
Jason Rhinelander 2021-06-01 19:14:27 -03:00
parent 6b70b3fe71
commit 8993fa093f
14 changed files with 281 additions and 312 deletions

View file

@ -56,20 +56,13 @@ class user_pubkey_t {
}
};
/// message as received by client
/// message as received from client
struct message_t {
std::string pub_key;
std::string data;
std::string hash;
uint64_t ttl;
uint64_t timestamp;
/// Nonce is now meaningless, but we keep it to avoid breaking the protocol
std::string nonce;
message_t(const std::string& pk, const std::string& text,
const std::string& hash, uint64_t ttl, uint64_t timestamp)
: pub_key(pk), data(text), hash(hash), ttl(ttl), timestamp(timestamp) {}
std::chrono::milliseconds ttl;
std::chrono::system_clock::time_point timestamp;
};
using swarm_id_t = uint64_t;

View file

@ -84,6 +84,24 @@ std::string computeMessageHash(std::vector<std::string_view> parts, bool hex) {
return hashResult;
}
bool validateTimestamp(std::chrono::system_clock::time_point timestamp, std::chrono::milliseconds ttl) {
auto now = std::chrono::system_clock::now();
// Timestamp must not be in the future (with some tolerance)
if (timestamp > now + 10s)
return false;
// Don't accept timestamp that has already expired
if (timestamp + ttl < now)
return false;
return true;
}
bool validateTTL(std::chrono::milliseconds ttl) {
// Minimum time to live of 10 seconds, maximum of 14 days
return ttl >= 10s && ttl <= 14 * 24h;
}
RequestHandler::RequestHandler(
ServiceNode& sn,
@ -118,9 +136,8 @@ Response RequestHandler::process_store(const json& params) {
}
}
const auto& ttl = params.at("ttl").get_ref<const std::string&>();
const auto& timestamp =
params.at("timestamp").get_ref<const std::string&>();
const auto& ttl_in = params.at("ttl");
const auto& timestamp_in = params.at("timestamp");
const auto& data = params.at("data").get_ref<const std::string&>();
OXEN_LOG(trace, "Storing message: {}", data);
@ -149,24 +166,39 @@ Response RequestHandler::process_store(const json& params) {
return this->handle_wrong_swarm(pk);
}
uint64_t ttlInt;
if (!util::parseTTL(ttl, ttlInt)) {
OXEN_LOG(debug, "Forbidden. Invalid TTL: {}", ttl);
using namespace std::chrono;
std::optional<milliseconds> ttl;
if (ttl_in.is_number_unsigned())
ttl.emplace(ttl_in.get<uint64_t>());
else if (uint64_t ttlInt; ttl_in.is_string() &&
util::parse_int(ttl_in.get_ref<const std::string&>(), ttlInt))
ttl.emplace(ttlInt);
if (!ttl || !validateTTL(*ttl)) {
OXEN_LOG(debug, "Forbidden. Invalid TTL: {}", ttl_in.dump());
return {http::FORBIDDEN, "Provided TTL is not valid.\n"};
}
uint64_t timestampInt;
if (!util::parseTimestamp(timestamp, ttlInt, timestampInt)) {
OXEN_LOG(debug, "Forbidden. Invalid Timestamp: {}", timestamp);
std::optional<system_clock::time_point> timestamp;
if (timestamp_in.is_number_unsigned())
timestamp.emplace(milliseconds{timestamp_in.get<uint64_t>()});
else if (uint64_t t; timestamp_in.is_string() &&
util::parse_int(ttl_in.get_ref<const std::string&>(), t))
timestamp.emplace(milliseconds{t});
if (!timestamp || !validateTimestamp(*timestamp, *ttl)) {
OXEN_LOG(debug, "Forbidden. Invalid Timestamp: {}", timestamp_in.dump());
return {http::NOT_ACCEPTABLE, "Timestamp error: check your clock\n"};
}
auto messageHash = computeMessageHash({timestamp, ttl, pk.str(), data}, true);
auto messageHash = computeMessageHash({
std::to_string(duration_cast<milliseconds>(timestamp->time_since_epoch()).count()),
std::to_string(ttl->count()),
pk.str(),
data}, true);
bool success;
try {
success = service_node_.process_store({pk.str(), data, messageHash, ttlInt, timestampInt});
success = service_node_.process_store({pk.str(), data, messageHash, *ttl, *timestamp});
} catch (const std::exception& e) {
OXEN_LOG(critical,
"Internal Server Error. Could not store message for {}",
@ -346,8 +378,9 @@ Response RequestHandler::process_retrieve(const json& params) {
for (const auto& item : items) {
json message;
message["hash"] = item.hash;
/// TODO: calculate expiration time once only?
message["expiration"] = item.timestamp + item.ttl;
message["expiration"] = std::chrono::duration_cast<std::chrono::milliseconds>(
item.expiration.time_since_epoch()
).count();
message["data"] = item.data;
messages.push_back(message);
}

View file

@ -41,6 +41,12 @@ std::string to_string(const Response& res);
/// concatenated string parts, and can be returned as either bytes (64 bytes) or hex (128 chars)
std::string computeMessageHash(std::vector<std::string_view> parts, bool hex);
// Validates a TTL value to see if it is acceptable.
bool validateTTL(std::chrono::milliseconds ttl);
// Validates a timestamp to see if it is acceptable. Takes the timestamp and the associated TTL.
bool validateTimestamp(std::chrono::system_clock::time_point timestamp, std::chrono::milliseconds ttl);
struct OnionRequestMetadata {
x25519_pubkey ephem_key;

View file

@ -7,6 +7,7 @@
#include "string_utils.hpp"
#include <boost/endian/conversion.hpp>
#include <chrono>
namespace oxen {
@ -23,25 +24,23 @@ static void serialize(std::string& buf, const std::string& str) {
buf += str;
}
template <typename T>
void serialize_message(std::string& res, const T& msg) {
void serialize_message(std::string& res, const storage::Item& msg) {
/// TODO: use binary / base64 representation for pk
res += msg.pub_key;
serialize(res, msg.hash);
serialize(res, msg.data);
serialize_integer(res, msg.ttl);
serialize_integer(res, msg.timestamp);
serialize(res, msg.nonce);
// For backwards compat, we send expiry as a ttl
serialize_integer<uint64_t>(res, std::chrono::duration_cast<std::chrono::milliseconds>(
msg.expiration - msg.timestamp).count());
serialize_integer<uint64_t>(res, std::chrono::duration_cast<std::chrono::milliseconds>(
msg.timestamp.time_since_epoch()).count());
serialize(res, ""s); // Empty nonce string, no longer used, but serialization currently requires it be here
OXEN_LOG(trace, "serialized message: {}", msg.data);
}
template void serialize_message(std::string& res, const message_t& msg);
template void serialize_message(std::string& res, const Item& msg);
template <typename T>
std::vector<std::string> serialize_messages(const std::vector<T>& msgs) {
std::vector<std::string> serialize_messages(const std::vector<storage::Item>& msgs) {
std::vector<std::string> res;
res.emplace_back();
@ -55,12 +54,6 @@ std::vector<std::string> serialize_messages(const std::vector<T>& msgs) {
return res;
}
template std::vector<std::string>
serialize_messages(const std::vector<message_t>& msgs);
template std::vector<std::string>
serialize_messages(const std::vector<Item>& msgs);
template <typename T>
static std::optional<T> deserialize_integer(std::string_view& slice) {
static_assert(std::is_trivial_v<T>);
@ -92,49 +85,58 @@ static std::optional<std::string> deserialize_string(std::string_view& slice) {
return std::nullopt;
}
std::vector<message_t> deserialize_messages(std::string_view slice) {
std::vector<storage::Item> deserialize_messages(std::string_view slice) {
OXEN_LOG(trace, "=== Deserializing ===");
std::vector<message_t> result;
std::vector<storage::Item> result;
while (!slice.empty()) {
auto& item = result.emplace_back();
/// Deserialize PK
auto pk = deserialize_string(slice, oxen::get_user_pubkey_size());
if (!pk) {
if (auto pk = deserialize_string(slice, oxen::get_user_pubkey_size()))
item.pub_key = std::move(*pk);
else {
OXEN_LOG(debug, "Could not deserialize pk");
return {};
}
/// Deserialize Hash
auto hash = deserialize_string(slice);
if (!hash) {
if (auto hash = deserialize_string(slice))
item.hash = std::move(*hash);
else {
OXEN_LOG(debug, "Could not deserialize hash");
return {};
}
/// Deserialize Data
auto data = deserialize_string(slice);
if (!data) {
if (auto data = deserialize_string(slice))
item.data = std::move(*data);
else {
OXEN_LOG(debug, "Could not deserialize data");
return {};
}
/// Deserialize TTL
auto ttl = deserialize_integer<uint64_t>(slice);
if (!ttl) {
std::chrono::milliseconds ttl;
if (auto ttl_ms = deserialize_integer<uint64_t>(slice))
ttl = std::chrono::milliseconds{*ttl_ms};
else {
OXEN_LOG(debug, "Could not deserialize ttl");
return {};
}
/// Deserialize Timestamp
auto timestamp = deserialize_integer<uint64_t>(slice);
if (!timestamp) {
if (auto timestamp = deserialize_integer<uint64_t>(slice))
item.timestamp = std::chrono::system_clock::time_point{std::chrono::milliseconds{*timestamp}};
else {
OXEN_LOG(debug, "Could not deserialize timestamp");
return {};
}
item.expiration = item.timestamp + ttl;
/// Deserialize Nonce
/// TODO: Nonce is unused but we have to call this for backwards compat (and if we don't
/// pull it off the string we can't read the next element). It would be good to complete
@ -144,11 +146,7 @@ std::vector<message_t> deserialize_messages(std::string_view slice) {
/// values as hex, and using a rigid fixed ordering of fields.
[[maybe_unused]] auto unused_nonce = deserialize_string(slice);
OXEN_LOG(trace, "Deserialized data: {}", *data);
OXEN_LOG(trace, "pk: {}, msg: {}", *pk, *data);
result.emplace_back(std::move(*pk), std::move(*data), std::move(*hash), *ttl, *timestamp);
OXEN_LOG(trace, "pk: {}, msg: {}", item.pub_key, item.data);
}
OXEN_LOG(trace, "=== END ===");

View file

@ -13,12 +13,10 @@ struct message_t;
inline constexpr size_t SERIALIZATION_BATCH_SIZE = 9'000'000;
template <typename T>
void serialize_message(std::string& buf, const T& msg);
void serialize_message(std::string& buf, const storage::Item& msg);
template <typename T>
std::vector<std::string> serialize_messages(const std::vector<T>& msgs);
std::vector<std::string> serialize_messages(const std::vector<storage::Item>& msgs);
std::vector<message_t> deserialize_messages(std::string_view blob);
std::vector<storage::Item> deserialize_messages(std::string_view blob);
} // namespace oxen

View file

@ -353,11 +353,12 @@ bool ServiceNode::process_store(const message_t& msg) {
all_stats_.bump_store_requests();
/// store in the database
save_if_new(msg);
/// store in the database (if not already present)
if (db_->store(msg))
OXEN_LOG(trace, "saved message: {}", msg.data);
std::string serialized;
serialize_message(serialized, msg);
serialize_message(serialized, Item{msg});
for (auto& peer : swarm_->other_nodes())
relay_data_reliable(serialized, peer);
@ -366,16 +367,6 @@ bool ServiceNode::process_store(const message_t& msg) {
return true;
}
void ServiceNode::save_if_new(const message_t& msg) {
std::lock_guard guard(sn_mutex_);
if (db_->store(msg.hash, msg.pub_key, msg.data, msg.ttl, msg.timestamp,
msg.nonce)) {
OXEN_LOG(trace, "saved message: {}", msg.data);
}
}
void ServiceNode::save_bulk(const std::vector<Item>& items) {
std::lock_guard guard(sn_mutex_);
@ -1143,11 +1134,12 @@ void ServiceNode::initiate_peer_test() {
}
void ServiceNode::bootstrap_peers(const std::vector<sn_record_t>& peers) const {
std::vector<Item> all_entries;
this->get_all_messages(all_entries);
this->relay_messages(all_entries, peers);
if (!get_all_messages(all_entries)) {
OXEN_LOG(err, "Could not retrieve entries from the database");
return;
}
relay_messages(all_entries, peers);
}
void ServiceNode::bootstrap_swarms(
@ -1226,8 +1218,7 @@ void ServiceNode::bootstrap_swarms(
}
}
template <typename Message>
void ServiceNode::relay_messages(const std::vector<Message>& messages,
void ServiceNode::relay_messages(const std::vector<storage::Item>& messages,
const std::vector<sn_record_t>& snodes) const {
std::vector<std::string> batches = serialize_messages(messages);
@ -1390,26 +1381,14 @@ void ServiceNode::process_push_batch(const std::string& blob) {
if (blob.empty())
return;
std::vector<message_t> messages = deserialize_messages(blob);
std::vector<storage::Item> items = deserialize_messages(blob);
OXEN_LOG(trace, "Saving all: begin");
OXEN_LOG(debug, "Got {} messages from peers, size: {}", messages.size(),
OXEN_LOG(debug, "Got {} messages from peers, size: {}", items.size(),
blob.size());
std::vector<Item> items;
items.reserve(messages.size());
// TODO: avoid copying m.data
// Promoting message_t to Item:
std::transform(messages.begin(), messages.end(), std::back_inserter(items),
[](const message_t& m) {
return Item{m.hash, m.pub_key, m.timestamp,
m.ttl, m.timestamp + m.ttl, m.nonce,
m.data};
});
this->save_bulk(items);
save_bulk(items);
OXEN_LOG(trace, "Saving all: end");
}

View file

@ -95,9 +95,7 @@ class ServiceNode {
std::forward_list<std::future<void>> outstanding_https_reqs_;
void save_if_new(const message_t& msg);
// Save items to the database, notifying listeners as necessary
// Save multiple items to the database at once (i.e. in a single transaction)
void save_bulk(const std::vector<storage::Item>& items);
void on_bootstrap_update(block_update_t&& bu);
@ -120,9 +118,8 @@ class ServiceNode {
relay_data_reliable(const std::string& blob,
const sn_record_t& address) const; // mutex not needed
template <typename Message>
void relay_messages(
const std::vector<Message>& messages,
const std::vector<storage::Item>& items,
const std::vector<sn_record_t>& snodes) const; // mutex not needed
// Conducts any ping peer tests that are due; (this is designed to be called frequently and does

View file

@ -3,6 +3,7 @@
#include "Item.hpp"
#include "oxen_common.h"
#include <chrono>
#include <cstdint>
#include <filesystem>
#include <iostream>
@ -32,10 +33,21 @@ class Database {
enum class DuplicateHandling { IGNORE, FAIL };
bool store(const std::string& hash, const std::string& pubKey,
const std::string& bytes, uint64_t ttl, uint64_t timestamp,
const std::string& nonce,
DuplicateHandling behaviour = DuplicateHandling::FAIL);
bool store(std::string_view hash, std::string_view pubKey, std::string_view bytes,
std::chrono::system_clock::time_point timestamp, std::chrono::system_clock::time_point expiry,
DuplicateHandling behaviour = DuplicateHandling::FAIL);
bool store(std::string_view hash, std::string_view pubKey, std::string_view bytes,
std::chrono::milliseconds ttl, std::chrono::system_clock::time_point timestamp,
DuplicateHandling behaviour = DuplicateHandling::FAIL) {
return store(hash, pubKey, bytes, timestamp, timestamp + ttl, behaviour);
}
bool store(const storage::Item& item, DuplicateHandling behaviour = DuplicateHandling::FAIL) {
return store(item.hash, item.pub_key, item.data, item.timestamp, item.expiration, behaviour);
}
bool store(const message_t& msg, DuplicateHandling behaviour = DuplicateHandling::FAIL) {
return store(msg.hash, msg.pub_key, msg.data, msg.timestamp, msg.timestamp + msg.ttl, behaviour);
}
bool bulk_store(const std::vector<storage::Item>& items);

View file

@ -1,28 +1,52 @@
#pragma once
#include <cstdint>
#include <chrono>
#include <string>
#include "oxen_common.h"
namespace oxen {
namespace storage {
namespace oxen::storage {
struct Item {
Item(const std::string& hash, const std::string& pubKey, uint64_t timestamp,
uint64_t ttl, uint64_t expirationTimestamp, const std::string& nonce,
const std::string& bytes)
: hash(hash), pub_key(pubKey), timestamp(timestamp), ttl(ttl),
expiration_timestamp(expirationTimestamp), nonce(nonce), data(bytes) {
}
Item() = default;
std::string hash;
std::string pub_key;
uint64_t timestamp;
uint64_t ttl;
uint64_t expiration_timestamp;
std::string nonce;
std::chrono::system_clock::time_point timestamp;
std::chrono::system_clock::time_point expiration;
std::string data;
Item() = default;
Item(
std::string hash,
std::string pub_key,
std::chrono::system_clock::time_point timestamp,
std::chrono::system_clock::time_point expiration,
std::string data) :
hash{std::move(hash)},
pub_key{std::move(pub_key)},
timestamp{std::move(timestamp)},
expiration{std::move(expiration)},
data{std::move(data)}
{}
// Explicit conversion from a message_t
explicit Item(message_t&& msg) :
hash{std::move(msg.hash)}, pub_key{std::move(msg.pub_key)}, timestamp{std::move(msg.timestamp)},
expiration{timestamp + msg.ttl}, data{std::move(msg.data)}
{}
explicit Item(const message_t& msg) :
hash{msg.hash}, pub_key{msg.pub_key}, timestamp{msg.timestamp}, expiration{timestamp + msg.ttl}, data{msg.data}
{}
// Explicit conversion to a message_t
explicit operator message_t() const & {
return {pub_key, data, hash,
std::chrono::duration_cast<std::chrono::milliseconds>(expiration - timestamp),
timestamp};
}
explicit operator message_t() && {
return {std::move(pub_key), std::move(data), std::move(hash),
std::chrono::duration_cast<std::chrono::milliseconds>(expiration - timestamp),
std::move(timestamp)};
}
};
} // namespace storage
} // namespace oxen
} // namespace oxen::storage

View file

@ -3,6 +3,7 @@
#include "utils.hpp"
#include "sqlite3.h"
#include <chrono>
#include <cstdlib>
#include <exception>
@ -30,9 +31,9 @@ Database::Database(const std::filesystem::path& db_path) {
}
void Database::clean_expired() {
const auto now_ms = util::get_time_ms();
sqlite3_bind_int64(delete_expired_stmt, 1, now_ms);
using namespace std::chrono;
sqlite3_bind_int64(delete_expired_stmt, 1, duration_cast<milliseconds>(
system_clock::now().time_since_epoch()).count());
int rc;
while (true) {
@ -160,17 +161,17 @@ void Database::open_and_prepare(const std::filesystem::path& db_path) {
set_page_count(db);
const char* create_table_query =
"CREATE TABLE IF NOT EXISTS `Data`("
" `Hash` VARCHAR(128) NOT NULL,"
" `Owner` VARCHAR(256) NOT NULL,"
" `TTL` INTEGER NOT NULL,"
" `Timestamp` INTEGER NOT NULL,"
" `TimeExpires` INTEGER NOT NULL,"
" `Nonce` VARCHAR(128) NOT NULL,"
" `Data` BLOB"
"CREATE TABLE IF NOT EXISTS Data("
" Hash VARCHAR(128) NOT NULL,"
" Owner VARCHAR(256) NOT NULL,"
" TTL INTEGER NOT NULL," // No longer used; TODO: nuke this the next time we do a table migration
" Timestamp INTEGER NOT NULL,"
" TimeExpires INTEGER NOT NULL,"
" Nonce VARCHAR(128) NOT NULL," // No longer used; TODO: nuke this field the next time we do a table migration
" Data BLOB"
");"
"CREATE UNIQUE INDEX IF NOT EXISTS `idx_data_hash` ON `Data` (`Hash`);"
"CREATE INDEX IF NOT EXISTS `idx_data_owner` on `Data` ('Owner');";
"CREATE UNIQUE INDEX IF NOT EXISTS idx_data_hash ON Data(Hash);"
"CREATE INDEX IF NOT EXISTS idx_data_owner on Data('Owner');";
char* errMsg = nullptr;
rc = sqlite3_exec(db, create_table_query, nullptr, nullptr, &errMsg);
@ -182,52 +183,65 @@ void Database::open_and_prepare(const std::filesystem::path& db_path) {
throw std::runtime_error("Can't create table");
}
save_stmt = prepare_statement(
"INSERT INTO Data "
"(Hash, Owner, TTL, Timestamp, TimeExpires, Nonce, Data)"
"VALUES (?,?,?,?,?,?,?);");
save_stmt = prepare_statement(R"(
INSERT INTO Data
(Hash, Owner, Timestamp, TimeExpires, Data, TTL, Nonce)
VALUES (?,?,?,?,?,0,''))");
if (!save_stmt)
throw std::runtime_error("could not prepare the save statement");
save_or_ignore_stmt = prepare_statement(
"INSERT OR IGNORE INTO Data "
"(Hash, Owner, TTL, Timestamp, TimeExpires, Nonce, Data)"
"VALUES (?,?,?,?,?,?,?)");
save_or_ignore_stmt = prepare_statement(R"(
INSERT OR IGNORE INTO Data
(Hash, Owner, Timestamp, TimeExpires, Data, TTL, Nonce)
VALUES (?,?,?,?,?,0,''))");
if (!save_or_ignore_stmt)
throw std::runtime_error("could not prepare the bulk save statement");
get_all_for_pk_stmt = prepare_statement(
"SELECT * FROM Data WHERE `Owner` = ? ORDER BY rowid LIMIT ?;");
get_all_for_pk_stmt = prepare_statement(R"(
SELECT Hash, Owner, Timestamp, TimeExpires, Data
FROM Data
WHERE Owner = ?
ORDER BY rowid LIMIT ?)");
if (!get_all_for_pk_stmt)
throw std::runtime_error(
"could not prepare the get all for pk statement");
get_all_stmt = prepare_statement("SELECT * FROM Data ORDER BY rowid;");
get_all_stmt = prepare_statement(R"(
SELECT Hash, Owner, Timestamp, TimeExpires, Data
FROM Data
ORDER BY rowid)");
if (!get_all_stmt)
throw std::runtime_error("could not prepare the get all statement");
get_stmt =
prepare_statement("SELECT * FROM `Data` WHERE `Owner` == ? AND rowid >"
"COALESCE((SELECT `rowid` FROM `Data` WHERE `Hash` = "
"?), 0) ORDER BY rowid LIMIT ?;");
get_stmt = prepare_statement(R"(
SELECT Hash, Owner, Timestamp, TimeExpires, Data
FROM Data
WHERE Owner = ? AND rowid > COALESCE((SELECT rowid FROM Data WHERE Hash = ?), 0)
ORDER BY rowid
LIMIT ?)");
if (!get_stmt)
throw std::runtime_error("could not prepare get statement");
get_row_count_stmt = prepare_statement("SELECT count(*) FROM `Data`;");
get_row_count_stmt = prepare_statement("SELECT COUNT(*) FROM Data");
if (!get_row_count_stmt)
throw std::runtime_error("could not prepare row count statement");
get_random_stmt = prepare_statement("SELECT * FROM Data WHERE rowid = (SELECT rowid FROM Data ORDER BY RANDOM() LIMIT 1)");
get_random_stmt = prepare_statement(R"(
SELECT Hash, Owner, Timestamp, TimeExpires, Data
FROM Data
WHERE rowid = (SELECT rowid FROM Data ORDER BY RANDOM() LIMIT 1))");
if (!get_random_stmt)
throw std::runtime_error("could not prepare get random statement");
get_by_hash_stmt =
prepare_statement("SELECT * FROM `Data` WHERE `Hash` = ?;");
get_by_hash_stmt = prepare_statement(R"(
SELECT Hash, Owner, Timestamp, TimeExpires, Data
FROM Data
WHERE Hash = ?)");
if (!get_by_hash_stmt)
throw std::runtime_error("could not prepare get by hash statement");
delete_expired_stmt =
prepare_statement("DELETE FROM `Data` WHERE `TimeExpires` <= ?");
prepare_statement("DELETE FROM Data WHERE TimeExpires <= ?");
if (!delete_expired_stmt)
throw std::runtime_error(
"could not prepare 'delete expired' statement");
@ -297,19 +311,14 @@ bool Database::get_message_count(uint64_t& count) {
/// Extract item from the result of a successfull select statement execution
static Item extract_item(sqlite3_stmt* stmt) {
using namespace std::chrono;
Item item;
// "If the SQL statement does not currently point to a valid row, or if the
// column index is out of range, the result is undefined"
item.hash = std::string((const char*)sqlite3_column_text(stmt, 0));
item.pub_key = std::string((const char*)sqlite3_column_text(stmt, 1));
item.ttl = sqlite3_column_int64(stmt, 2);
item.timestamp = sqlite3_column_int64(stmt, 3);
item.expiration_timestamp = sqlite3_column_int64(stmt, 4);
item.nonce = std::string((const char*)sqlite3_column_text(stmt, 5));
item.data = std::string((char*)sqlite3_column_blob(stmt, 6),
sqlite3_column_bytes(stmt, 6));
item.hash = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
item.pub_key = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
item.timestamp = system_clock::time_point{milliseconds{sqlite3_column_int64(stmt, 2)}};
item.expiration = system_clock::time_point{milliseconds{sqlite3_column_int64(stmt, 3)}};
item.data = std::string(reinterpret_cast<const char*>(sqlite3_column_blob(stmt, 4)),
sqlite3_column_bytes(stmt, 4));
return item;
}
@ -344,9 +353,9 @@ bool Database::retrieve_random(Item& item) {
return success;
}
bool Database::retrieve_by_hash(const std::string& msg_hash, Item& item) {
bool Database::retrieve_by_hash(std::string_view msg_hash, Item& item) {
sqlite3_bind_text(get_by_hash_stmt, 1, msg_hash.c_str(), -1, SQLITE_STATIC);
sqlite3_bind_text(get_by_hash_stmt, 1, msg_hash.data(), msg_hash.size(), SQLITE_STATIC);
bool success = false;
int rc;
@ -379,28 +388,26 @@ bool Database::retrieve_by_hash(const std::string& msg_hash, Item& item) {
return success;
}
bool Database::store(const std::string& hash, const std::string& pubKey,
const std::string& bytes, uint64_t ttl, uint64_t timestamp,
const std::string& nonce,
DuplicateHandling duplicateHandling) {
const auto exp_time = timestamp + ttl;
bool Database::store(
std::string_view hash,
std::string_view pubKey,
std::string_view bytes,
std::chrono::system_clock::time_point timestamp,
std::chrono::system_clock::time_point expiry,
DuplicateHandling duplicateHandling) {
sqlite3_stmt* stmt = duplicateHandling == DuplicateHandling::IGNORE
? save_or_ignore_stmt
: save_stmt;
// TODO: bind can return errors, handle them
sqlite3_bind_text(stmt, 1, hash.c_str(), -1, SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, pubKey.c_str(), -1, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 3, ttl);
sqlite3_bind_int64(stmt, 4, timestamp);
sqlite3_bind_int64(stmt, 5, exp_time);
sqlite3_bind_blob(stmt, 6, nonce.data(), nonce.size(), SQLITE_STATIC);
sqlite3_bind_blob(stmt, 7, bytes.data(), bytes.size(), SQLITE_STATIC);
sqlite3_bind_text(stmt, 1, hash.data(), hash.size(), SQLITE_STATIC);
sqlite3_bind_text(stmt, 2, pubKey.data(), pubKey.size(), SQLITE_STATIC);
using namespace std::chrono;
sqlite3_bind_int64(stmt, 3, duration_cast<milliseconds>(timestamp.time_since_epoch()).count());
sqlite3_bind_int64(stmt, 4, duration_cast<milliseconds>(expiry.time_since_epoch()).count());
sqlite3_bind_blob(stmt, 5, bytes.data(), bytes.size(), SQLITE_STATIC);
// keep track of db full errorss so we don't print them on every store
static int db_full_counter = 0;
// print the error once so many errors
constexpr int DB_FULL_FREQUENCY = 100;
@ -444,10 +451,8 @@ bool Database::bulk_store(const std::vector<Item>& items) {
}
try {
for (const auto& item : items) {
store(item.hash, item.pub_key, item.data, item.ttl, item.timestamp,
item.nonce, DuplicateHandling::IGNORE);
}
for (const auto& item : items)
store(item, DuplicateHandling::IGNORE);
} catch (...) {
OXEN_LOG(err, "Failed to store items during bulk operation");
}

View file

@ -3,9 +3,11 @@
#include <catch2/catch.hpp>
#include <chrono>
#include <string>
using namespace oxen;
using oxen::storage::Item;
TEST_CASE("serialization - basic values", "[serialization]") {
@ -13,11 +15,11 @@ TEST_CASE("serialization - basic values", "[serialization]") {
"054368520005786b249bcd461d28f75e560ea794014eeb17fcf6003f37d876783e"s;
const auto data = "data";
const auto hash = "hash";
const uint64_t timestamp = 12345678;
const uint64_t ttl = 3456000;
const std::chrono::system_clock::time_point timestamp{12'345'678ms};
const auto ttl = 3456s;
message_t msg{pub_key, data, hash, ttl, timestamp};
std::string msg_serialized;
serialize_message(msg_serialized, msg);
serialize_message(msg_serialized, Item{msg});
const auto expected_serialized = oxenmq::to_hex(pub_key) +
"040000000000000068617368" // size+hash
"040000000000000064617461" // size+data
@ -25,7 +27,7 @@ TEST_CASE("serialization - basic values", "[serialization]") {
"4e61bc0000000000" // timestamp
"0000000000000000"s; // nonce
CHECK(oxenmq::to_hex(msg_serialized) == expected_serialized);
const std::vector<message_t> inputs{msg, msg};
const std::vector<Item> inputs{Item{msg}, Item{msg}};
const std::vector<std::string> batches = serialize_messages(inputs);
CHECK(batches.size() == 1);
CHECK(oxenmq::to_hex(batches[0]) == expected_serialized + expected_serialized);
@ -37,7 +39,7 @@ TEST_CASE("serialization - basic values", "[serialization]") {
CHECK(messages[i].data == data);
CHECK(messages[i].hash == hash);
CHECK(messages[i].timestamp == timestamp);
CHECK(messages[i].ttl == ttl);
CHECK(messages[i].expiration == timestamp + ttl);
}
}
@ -46,14 +48,14 @@ TEST_CASE("serialization - batch serialization", "[serialization]") {
"054368520005786b249bcd461d28f75e560ea794014eeb17fcf6003f37d876783e";
std::string data(100000, 'x');
const auto hash = "hash";
const uint64_t timestamp = 12345678;
const uint64_t ttl = 3456000;
const std::chrono::system_clock::time_point timestamp{1'622'576'077s};
const auto ttl = 24h;
message_t msg{pub_key, data, hash, ttl, timestamp};
std::string buffer;
serialize_message(buffer, msg);
serialize_message(buffer, Item{msg});
const size_t num_messages = (SERIALIZATION_BATCH_SIZE / buffer.size()) + 1;
std::vector<message_t> inputs(num_messages, msg);
std::vector<Item> inputs(num_messages, Item{msg});
CHECK(serialize_messages(inputs).size() == 1);
inputs.push_back(msg);
inputs.push_back(Item{msg});
CHECK(serialize_messages(inputs).size() == 2);
}

View file

@ -41,12 +41,11 @@ TEST_CASE("storage - data persistence", "[storage]") {
const auto hash = "myhash";
const auto pubkey = "mypubkey";
const auto bytes = "bytesasstring";
const auto nonce = "nonce";
const uint64_t ttl = 123456;
const uint64_t timestamp = util::get_time_ms();
const auto ttl = 123456ms;
const auto now = std::chrono::system_clock::now();
{
Database storage{"."};
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp, nonce));
CHECK(storage.store(hash, pubkey, bytes, ttl, now));
// the database is closed when storage goes out of scope
}
{
@ -58,10 +57,10 @@ TEST_CASE("storage - data persistence", "[storage]") {
CHECK(storage.retrieve(pubkey, items, lastHash));
CHECK(items.size() == 1);
REQUIRE(items.size() == 1);
CHECK(items[0].pub_key == pubkey);
CHECK(items[0].hash == hash);
CHECK(items[0].expiration_timestamp - items[0].timestamp == ttl);
CHECK(items[0].expiration - items[0].timestamp == ttl);
CHECK(items[0].data == bytes);
}
}
@ -72,15 +71,14 @@ TEST_CASE("storage - returns false when storing existing hash", "[storage]") {
const auto hash = "myhash";
const auto pubkey = "mypubkey";
const auto bytes = "bytesasstring";
const auto nonce = "nonce";
const uint64_t ttl = 123456;
const uint64_t timestamp = util::get_time_ms();
const auto ttl = 123456ms;
const auto timestamp = std::chrono::system_clock::now();
Database storage{"."};
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp, nonce));
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp));
// store using the same hash, FAIL is default behaviour
CHECK_FALSE(storage.store(hash, pubkey, bytes, ttl, timestamp, nonce,
CHECK_FALSE(storage.store(hash, pubkey, bytes, ttl, timestamp,
Database::DuplicateHandling::FAIL));
}
@ -90,15 +88,14 @@ TEST_CASE("storage - returns true when storing existing with ignore constraint",
const auto hash = "myhash";
const auto pubkey = "mypubkey";
const auto bytes = "bytesasstring";
const auto nonce = "nonce";
const uint64_t ttl = 123456;
const uint64_t timestamp = util::get_time_ms();
const auto ttl = 123456ms;
const auto timestamp = std::chrono::system_clock::now();
Database storage{"."};
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp, nonce));
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp));
// store using the same hash
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp, nonce,
CHECK(storage.store(hash, pubkey, bytes, ttl, timestamp,
Database::DuplicateHandling::IGNORE));
}
@ -107,16 +104,15 @@ TEST_CASE("storage - only return entries for specified pubkey", "[storage]") {
Database storage{"."};
CHECK(storage.store("hash0", "mypubkey", "bytesasstring0", 100000,
util::get_time_ms(), "nonce"));
CHECK(storage.store("hash1", "otherpubkey", "bytesasstring1", 100000,
util::get_time_ms(), "nonce"));
auto now = std::chrono::system_clock::now();
CHECK(storage.store("hash0", "mypubkey", "bytesasstring0", 100s, now));
CHECK(storage.store("hash1", "otherpubkey", "bytesasstring1", 100s, now));
{
std::vector<Item> items;
const auto lastHash = "";
CHECK(storage.retrieve("mypubkey", items, lastHash));
CHECK(items.size() == 1);
REQUIRE(items.size() == 1);
CHECK(items[0].hash == "hash0");
}
@ -124,7 +120,7 @@ TEST_CASE("storage - only return entries for specified pubkey", "[storage]") {
std::vector<Item> items;
const auto lastHash = "";
CHECK(storage.retrieve("otherpubkey", items, lastHash));
CHECK(items.size() == 1);
REQUIRE(items.size() == 1);
CHECK(items[0].hash == "hash1");
}
}
@ -134,18 +130,18 @@ TEST_CASE("storage - return entries older than lasthash", "[storage]") {
Database storage{"."};
auto now = std::chrono::system_clock::now();
const size_t num_entries = 100;
for (size_t i = 0; i < num_entries; i++) {
const auto hash = std::string("hash") + std::to_string(i);
storage.store(hash, "mypubkey", "bytesasstring", 100000,
util::get_time_ms(), "nonce");
storage.store(hash, "mypubkey", "bytesasstring", 100s, now);
}
{
std::vector<Item> items;
const auto lastHash = "hash0";
CHECK(storage.retrieve("mypubkey", items, lastHash));
CHECK(items.size() == num_entries - 1);
REQUIRE(items.size() == num_entries - 1);
CHECK(items[0].hash == "hash1");
}
@ -154,7 +150,7 @@ TEST_CASE("storage - return entries older than lasthash", "[storage]") {
const auto lastHash =
std::string("hash") + std::to_string(num_entries / 2 - 1);
CHECK(storage.retrieve("mypubkey", items, lastHash));
CHECK(items.size() == num_entries / 2);
REQUIRE(items.size() == num_entries / 2);
CHECK(items[0].hash == "hash" + std::to_string(num_entries / 2));
}
}
@ -166,15 +162,14 @@ TEST_CASE("storage - remove expired entries", "[storage]") {
Database storage{"."};
CHECK(storage.store("hash0", pubkey, "bytesasstring0", 100000,
util::get_time_ms(), "nonce"));
CHECK(storage.store("hash1", pubkey, "bytesasstring0", 0,
util::get_time_ms(), "nonce"));
auto now = std::chrono::system_clock::now();
CHECK(storage.store("hash0", pubkey, "bytesasstring0", 1s, now));
CHECK(storage.store("hash1", pubkey, "bytesasstring0", 0s, now));
{
std::vector<Item> items;
const auto lastHash = "";
CHECK(storage.retrieve(pubkey, items, lastHash));
CHECK(items.size() == 2);
REQUIRE(items.size() == 2);
}
std::this_thread::sleep_for(5ms);
storage.clean_expired();
@ -182,7 +177,7 @@ TEST_CASE("storage - remove expired entries", "[storage]") {
std::vector<Item> items;
const auto lastHash = "";
CHECK(storage.retrieve(pubkey, items, lastHash));
CHECK(items.size() == 1);
REQUIRE(items.size() == 1);
CHECK(items[0].hash == "hash0");
}
}
@ -192,9 +187,8 @@ TEST_CASE("storage - bulk data storage", "[storage]") {
const auto pubkey = "mypubkey";
const auto bytes = "bytesasstring";
const auto nonce = "nonce";
const uint64_t ttl = 123456;
const uint64_t timestamp = util::get_time_ms();
const auto ttl = 123456ms;
const auto timestamp = std::chrono::system_clock::now();
const size_t num_items = 100;
@ -204,8 +198,8 @@ TEST_CASE("storage - bulk data storage", "[storage]") {
{
std::vector<Item> items;
for (int i = 0; i < num_items; ++i) {
items.push_back({std::to_string(i), pubkey, timestamp, ttl,
timestamp + ttl, nonce, bytes});
items.emplace_back(std::to_string(i), pubkey, timestamp,
timestamp + ttl, bytes);
}
CHECK(storage.bulk_store(items));
@ -225,23 +219,21 @@ TEST_CASE("storage - bulk storage with overlap", "[storage]") {
const auto pubkey = "mypubkey";
const auto bytes = "bytesasstring";
const auto nonce = "nonce";
const uint64_t ttl = 123456;
const uint64_t timestamp = util::get_time_ms();
const auto ttl = 123456ms;
const auto timestamp = std::chrono::system_clock::now();
const size_t num_items = 100;
Database storage{"."};
// insert existing
CHECK(storage.store("0", pubkey, bytes, ttl, timestamp, nonce));
CHECK(storage.store("0", pubkey, bytes, ttl, timestamp));
// bulk store
{
std::vector<Item> items;
for (int i = 0; i < num_items; ++i) {
items.push_back({std::to_string(i), pubkey, timestamp, ttl,
timestamp + ttl, nonce, bytes});
items.emplace_back(std::to_string(i), pubkey, timestamp, timestamp + ttl, bytes);
}
CHECK(storage.bulk_store(items));
@ -261,11 +253,11 @@ TEST_CASE("storage - retrieve limit", "[storage]") {
Database storage{"."};
auto now = std::chrono::system_clock::now();
const size_t num_entries = 100;
for (size_t i = 0; i < num_entries; i++) {
const auto hash = std::string("hash") + std::to_string(i);
storage.store(hash, "mypubkey", "bytesasstring", 100000,
util::get_time_ms(), "nonce");
storage.store(hash, "mypubkey", "bytesasstring", 100s, now);
}
// should return all items

View file

@ -2,25 +2,11 @@
#include <cstdint>
#include <filesystem>
#include <iosfwd>
#include <optional>
#include <random>
#include <string>
namespace util {
bool validateTTL(uint64_t ttlInt);
// Convert ttl string into uint64_t, return bool for success/fail
bool parseTTL(const std::string& ttlString, uint64_t& ttl);
bool validateTimestamp(uint64_t timestamp, uint64_t ttl);
// Convert timestamp string into uint64_t, return bool for success/fail
bool parseTimestamp(const std::string& timestampString, const uint64_t ttl,
uint64_t& timestamp);
// Get current time in milliseconds
uint64_t get_time_ms();
/// Returns a reference to a randomly seeded, thread-local RNG.
std::mt19937_64& rng();

View file

@ -1,6 +1,5 @@
#include "utils.hpp"
#include <chrono>
#include <cstring>
#ifndef _WIN32
@ -13,61 +12,6 @@ extern "C" {
namespace util {
uint64_t get_time_ms() {
const auto timestamp = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(
timestamp.time_since_epoch())
.count();
}
bool validateTimestamp(uint64_t timestamp, uint64_t ttl) {
const uint64_t cur_time = get_time_ms();
// Timestamp must not be in the future (with some tolerance)
if (timestamp > cur_time + 10000)
return false;
// Don't need to worry about overflow for several hundred million years
const uint64_t exp_time = timestamp + ttl;
// Don't accept timestamp that has already expired
if (exp_time < cur_time)
return false;
return true;
}
bool parseTimestamp(const std::string& timestampString, const uint64_t ttl,
uint64_t& timestamp) {
try {
timestamp = std::stoull(timestampString);
} catch (...) {
return false;
}
return validateTimestamp(timestamp, ttl);
}
bool validateTTL(uint64_t ttlInt) {
// Minimum time to live of 10 seconds, maximum of 14 days
return (ttlInt >= 10 * 1000 && ttlInt <= 14 * 24 * 60 * 60 * 1000);
}
bool parseTTL(const std::string& ttlString, uint64_t& ttl) {
int ttlInt;
try {
ttlInt = std::stoi(ttlString);
} catch (...) {
return false;
}
if (!validateTTL(ttlInt))
return false;
ttl = static_cast<uint64_t>(ttlInt);
return true;
}
std::mt19937_64& rng() {
static thread_local std::mt19937_64 generator{std::random_device{}()};
return generator;