oxen-storage-server/httpserver/request_handler.cpp
Jason Rhinelander 8d34f76002 Storage server refactoring & ping reporting redesign
Refactoring:

- Overhaul how pubkeys are stored: rather that storing in strings in multiple forms (hex, binary,
  base32) we now store each pubkey type (legacy, ed25519, x25519) in a type-safe container which
  extends an std::array, thus giving us allocation-free storage for them.
  - Do conversion into these types early on so that *most* of the code takes type-safe pubkeys
    rather than strings in random formats; thus making the public API (e.g. HTTP request parser)
    deal with encoding/decoding, rather than making it happen deep down.  (For example,
    ChannelEncryption shouldn't need to worry about decoding a hex string).  This was pretty messy:
    some code wanted hex, some base32z, some base32z with ".snode" appended, some base64.
- When printing pubkeys in logs, print them as hex, not base32z.  Base32z was really hard to
  reconcile with pubkeys because everywhere *else* we show them as hex.
- Overhaul sn_record_t with a much lighter one: it is now a very simple struct with just the members
  it needs (ip, ports, pubkeys), and is no longer hashable (instead we can hash on the
  .legacy_pubkey member).
  - Simplify some interfaces taking multiple values from sn_record_t by just passing the sn_record_t
- Moved a bunch of things that were in the global namespace into the oxen namespace.
- Simplify swarm storage and lookup: we previous had various methods that did a linear scan on all
  active nodes, and could do string-based searching for different representations of the pubkey
  (hex, base32z, etc.).  Replace it all with a simple structure of:
    unordered_map<legacy_pubkey, sn_record_t>
    unordered_map<ed25519_pubkey, legacy_pubkey>
    unordered_map<x25519_pubkey, legacy_pubkey>
  where the first map holds the entries and the latter two point us to the key in the first one.
- De-templatize ChannelEncryption, and make it take pubkey types rather than strings.  (The template
  was doing nothing as it was only ever used with T=std::string).
- Fix a leak in ChannelEncryption CBC decryption if it throws (the context would leak) by storing
  the context in a unique_ptr with a deleter that frees the context.
- Optimized ChannelEncryption encryption code somewhat by reducing allocations via more use of
  string_views and tweaking how we build the encrypted strings.
- Fix legacy (i.e. Monero) signature generation: the random byte value being generated in was only
  setting the first 11 bytes of 32.
- Miscellaneous code cleanups (much of which are C++14/17 syntax).
- Moved std::literals namespace import to a small number of top-level headers so they are available
  everywhere.  (std::literals is guaranteed to never conflict with user-space literals precisely so
  that doing this everywhere is perfectly safe without polluting -- i.e. "foo"sv, 44s can *never* be
  anything other than string_view and seconds literals).
- Made pubkey parsing (e.g. in HTTP headers) accept any of hex/base32z/base64 so that we can, at
  some point in the future, just stop using base32z representation (since it conflicts badly with
  lokinet .snode address which is based on the ed25519 pubkey, not the legacy pubkey).
- RateLimiter - simply it to take the IP as a uint32_t (and thus avoid allocations).  (Similarly it
  avoids allocations in the SN case via using the new pubkey type).
- Move some repeated tasks into the simpler oxenmq add_timer().

Ping reporting:

This completely rewrites how ping results are handled.

Current situation: SS does its own testing of other SS.  Every 10s it picks some random node and
sends an HTTP ping and OMQ ping to it.  If they fail either it tracks the failure and tries them
again every 10s.  If they are still failing after 2h, then it finally tells oxend about the failure
one time, remembers that it told oxend (in memory), and then never tells it again until the remote
SS starts responding to pings again; once that happens it tells oxend that it is responding again
and then never tells it anything else unless the remote starts failing again for 2h.

This has some major shortcomings:
- The 10s repeat will hammer a node pretty hard with pings, because if it is down for a while, most
  of the network will notice and be pinging it every 10s.  That means 1600x2 incoming requests every
  10s, which is pretty excessive.
- Oxend reporting edge case 1: If a node is bad and then storage server restarts, SS won't have it
  in its "bad list" anymore, so it isn't testing it at all (until it gets selected randomly, but
  with 1600 nodes that is going to be an average of more than 2 hours and could be way longer).
  Thus oxend never gets the "all good" signal and will continue to think the node is bad for much,
  much longer than it actually is.  (In fact, it may *never* get a good signal again if it's working
  the next time SS randomly pings it).
- Restarts the other way are also a problem: when oxend restarts it doesn't know of any of the bad
  nodes anymore, but since SS only tells it once, it never learns about it and thinks it's good.
- `oxend print_sn <PUBKEY>` is much, much less useful than it could be: usually storage servers are
  in "awaiting first result" status because SS won't tell oxend anything until it has decided there
  is some failure.  When it tells you "last ping was .... ago" that's also completely useless
  because SS never reports any pings except for the first >2h bad result, and the first good result.

I suspect the reporting above was out of a concern than talking to oxend rpc too much would overload
it; that isn't the case anymore (and since SS is now using a persistent oxenmq connection the
requests are *extremely* fast since it doesn't even have to establish a connection).

So this PR overhauls it completely as follows:
- SS gets much dumber (i.e. simpler) w.r.t. pings: all it does is randomly pick probably-good nodes
  to test every 10s, and then pings known-failing nodes to re-test them.
- Retested nodes don't get pounded every 10s, instead they get the first retry after 10s, the second
  retry 20s after that, then 30s after that, and so on up to 5 minute intervals between re-tests.
- SS tells oxend *every* ping result that it gets (and doesn't track them, except to keep them or
  remove them from the "bad" list)
- Oxend then becomes responsible for deciding when a SS is bad enough to fail proofs.  On the oxend
  side the rule is:
    - if we have been receiving bad ping reports about a node from SS for more than 1h5min without
      any good ping in that time *and* we received a bad ping in the past 10 minutes then we
      consider it bad.  (the first condition is so that it has to have been bad for more than an
      hour, and the second condition is to ensure that SS is still sending us bad test results).
    - otherwise we consider it good (i.e. because either we aren't getting test results or because
      we're getting good test results).
    - Thus oxend can useful and accurately report the last time some storage server was tested,
      which allows much better diagnostics of remote SN status.
- Thus if oxend restarts it'll start getting the bad results right away, and if SS restarts oxend
  will stop getting them (and then fall back to "no info means good").
2021-04-18 14:50:40 -03:00

600 lines
19 KiB
C++
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "request_handler.h"
#include "channel_encryption.hpp"
#include "http_connection.h"
#include "lmq_server.h"
#include "oxen_logger.h"
#include "service_node.h"
#include "utils.hpp"
#include "https_client.h"
#include <nlohmann/json.hpp>
#include <openssl/sha.h>
#include <oxenmq/base32z.h>
#include <oxenmq/base64.h>
#include <oxenmq/hex.h>
using nlohmann::json;
namespace oxen {
constexpr size_t MAX_MESSAGE_BODY = 102400; // 100 KB limit
std::string to_string(const Response& res) {
std::stringstream ss;
ss << "Status: " << static_cast<int>(res.status()) << ", ";
ss << "ContentType: "
<< ((res.content_type() == ContentType::plaintext) ? "plaintext"
: "json")
<< ", ";
ss << "Body: <" << res.message() << ">";
return ss.str();
}
RequestHandler::RequestHandler(boost::asio::io_context& ioc, ServiceNode& sn,
const ChannelEncryption& ce)
: ioc_(ioc), service_node_(sn), channel_cipher_(ce) {}
static json snodes_to_json(const std::vector<sn_record_t>& snodes) {
json res_body;
json snodes_json = json::array();
for (const auto& sn : snodes) {
snodes_json.push_back(json{
{"address", oxenmq::to_base32z(sn.pubkey_legacy.view()) + ".snode"}, // Deprecated, use pubkey_legacy instead
{"pubkey_legacy", sn.pubkey_legacy.hex()},
{"pubkey_x25519", sn.pubkey_x25519.hex()},
{"pubkey_ed25519", sn.pubkey_ed25519.hex()},
{"port", std::to_string(sn.port)}, // Why is this a string?
{"ip", sn.ip}});
}
res_body["snodes"] = snodes_json;
return res_body;
}
static std::string obfuscate_pubkey(std::string_view pk) {
std::string res;
res += pk.substr(0, 2);
res += "...";
res += pk.substr(pk.length() - 3);
return res;
}
/// TODO: this probably shouldn't return Response...
Response RequestHandler::handle_wrong_swarm(const user_pubkey_t& pubKey) {
const std::vector<sn_record_t> nodes =
service_node_.get_snodes_by_pk(pubKey);
const json res_body = snodes_to_json(nodes);
OXEN_LOG(trace, "Got client request to a wrong swarm");
return Response{Status::MISDIRECTED_REQUEST, res_body.dump(),
ContentType::json};
}
std::string computeMessageHash(const std::string& timestamp,
const std::string& ttl,
const std::string& recipient,
const std::string& data) {
SHA512_CTX ctx;
SHA512_Init(&ctx);
for (const auto* s : {&timestamp, &ttl, &recipient, &data})
SHA512_Update(&ctx, s->data(), s->size());
unsigned char hashResult[SHA512_DIGEST_LENGTH];
SHA512_Final(hashResult, &ctx);
return oxenmq::to_hex(std::begin(hashResult), std::end(hashResult));
}
Response RequestHandler::process_store(const json& params) {
constexpr const char* fields[] = {"pubKey", "ttl", "timestamp", "data"};
for (const auto& field : fields) {
if (!params.contains(field)) {
OXEN_LOG(debug, "Bad client request: no `{}` field", field);
return Response{
Status::BAD_REQUEST,
fmt::format("invalid json: no `{}` field\n", field)};
}
}
const auto& ttl = params.at("ttl").get_ref<const std::string&>();
const auto& timestamp =
params.at("timestamp").get_ref<const std::string&>();
const auto& data = params.at("data").get_ref<const std::string&>();
OXEN_LOG(trace, "Storing message: {}", data);
bool created;
auto pk =
user_pubkey_t::create(params.at("pubKey").get<std::string>(), created);
if (!created) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
if (data.size() > MAX_MESSAGE_BODY) {
OXEN_LOG(debug, "Message body too long: {}", data.size());
auto msg =
fmt::format("Message body exceeds maximum allowed length of {}\n",
MAX_MESSAGE_BODY);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
if (!service_node_.is_pubkey_for_us(pk)) {
return this->handle_wrong_swarm(pk);
}
uint64_t ttlInt;
if (!util::parseTTL(ttl, ttlInt)) {
OXEN_LOG(debug, "Forbidden. Invalid TTL: {}", ttl);
return Response{Status::FORBIDDEN, "Provided TTL is not valid.\n"};
}
uint64_t timestampInt;
if (!util::parseTimestamp(timestamp, ttlInt, timestampInt)) {
OXEN_LOG(debug, "Forbidden. Invalid Timestamp: {}", timestamp);
return Response{Status::NOT_ACCEPTABLE,
"Timestamp error: check your clock\n"};
}
auto messageHash = computeMessageHash(timestamp, ttl, pk.str(), data);
bool success;
try {
const auto msg =
message_t{pk.str(), data, messageHash, ttlInt, timestampInt};
success = service_node_.process_store(msg);
} catch (const std::exception& e) {
OXEN_LOG(critical,
"Internal Server Error. Could not store message for {}",
obfuscate_pubkey(pk.str()));
return Response{Status::INTERNAL_SERVER_ERROR, e.what()};
}
if (!success) {
OXEN_LOG(warn, "Service node is initializing");
return Response{Status::SERVICE_UNAVAILABLE,
"Service node is initializing\n"};
}
OXEN_LOG(trace, "Successfully stored message for {}",
obfuscate_pubkey(pk.str()));
json res_body;
/// NOTE: difficulty is not longer used by modern clients, but
/// we send this to avoid breaking older clients.
res_body["difficulty"] = 1;
return Response{Status::OK, res_body.dump(), ContentType::json};
}
void RequestHandler::process_oxend_request(
const json& params, std::function<void(oxen::Response)> cb) {
const static auto allowed_endpoints =
std::unordered_set{"get_service_nodes", "ons_resolve"};
const auto endpoint_it = params.find("endpoint");
if (endpoint_it == params.end() || !endpoint_it->is_string()) {
cb({Status::BAD_REQUEST, "missing 'endpoint'"});
return;
}
const auto& endpoint_str = endpoint_it->get_ref<const std::string&>();
if (!allowed_endpoints.count(endpoint_str.c_str())) {
cb({Status::BAD_REQUEST,
fmt::format("Endpoint not allowed: {}", endpoint_str)});
return;
}
const auto oxend_params_it = params.find("oxend_params");
if (oxend_params_it == params.end() || !oxend_params_it->is_object()) {
cb({Status::BAD_REQUEST, "missing 'oxend_params'"});
return;
}
auto rpc_endpoint = fmt::format("rpc.{}", endpoint_str);
service_node_.omq_server().oxend_request(
rpc_endpoint,
[cb = std::move(cb)](bool success, auto&& data) {
if (success && data.size() >= 2) {
json res;
if (data[0] != "200") {
res["error"] = {{"code", data[0]}, {"message", data[1]}};
} else {
res["result"] = data[1];
}
cb({Status::OK, res.dump(), ContentType::json});
} else {
cb({Status::BAD_REQUEST, "unknown oxend error"});
}
},
oxend_params_it->dump());
}
Response RequestHandler::process_retrieve_all() {
std::vector<storage::Item> all_entries;
bool res = service_node_.get_all_messages(all_entries);
if (!res) {
return Response{Status::INTERNAL_SERVER_ERROR,
"could not retrieve all entries\n"};
}
json messages = json::array();
for (auto& entry : all_entries) {
json item;
item["data"] = entry.data;
item["pk"] = entry.pub_key;
messages.push_back(item);
}
json res_body;
res_body["messages"] = messages;
return Response{Status::OK, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_snodes_by_pk(const json& params) const {
if (!params.contains("pubKey")) {
OXEN_LOG(debug, "Bad client request: no `pubKey` field");
return Response{Status::BAD_REQUEST,
"invalid json: no `pubKey` field\n"};
}
bool success;
const auto pk =
user_pubkey_t::create(params.at("pubKey").get<std::string>(), success);
if (!success) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
const std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pk);
OXEN_LOG(debug, "Snodes by pk size: {}", nodes.size());
const json res_body = snodes_to_json(nodes);
OXEN_LOG(debug, "Snodes by pk: {}", res_body.dump());
return Response{Status::OK, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_retrieve(const json& params) {
constexpr const char* fields[] = {"pubKey", "lastHash"};
for (const auto& field : fields) {
if (!params.contains(field)) {
auto msg = fmt::format("invalid json: no `{}` field", field);
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
}
bool success;
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
if (!success) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
if (!service_node_.is_pubkey_for_us(pk)) {
return this->handle_wrong_swarm(pk);
}
const std::string& last_hash =
params.at("lastHash").get_ref<const std::string&>();
// Note: We removed long-polling
std::vector<storage::Item> items;
if (!service_node_.retrieve(pk.str(), last_hash, items)) {
auto msg = fmt::format(
"Internal Server Error. Could not retrieve messages for {}",
obfuscate_pubkey(pk.str()));
OXEN_LOG(critical, "{}", msg);
return Response{Status::INTERNAL_SERVER_ERROR, std::move(msg)};
}
if (!items.empty()) {
OXEN_LOG(trace, "Successfully retrieved messages for {}",
obfuscate_pubkey(pk.str()));
}
json res_body;
json messages = json::array();
for (const auto& item : items) {
json message;
message["hash"] = item.hash;
/// TODO: calculate expiration time once only?
message["expiration"] = item.timestamp + item.ttl;
message["data"] = item.data;
messages.push_back(message);
}
res_body["messages"] = messages;
return Response{Status::OK, res_body.dump(), ContentType::json};
}
void RequestHandler::process_client_req(
const std::string& req_json, std::function<void(oxen::Response)> cb) {
OXEN_LOG(trace, "process_client_req str <{}>", req_json);
const json body = json::parse(req_json, nullptr, false);
if (body == nlohmann::detail::value_t::discarded) {
OXEN_LOG(debug, "Bad client request: invalid json");
cb(Response{Status::BAD_REQUEST, "invalid json\n"});
}
OXEN_LOG(trace, "process_client_req json <{}>", body.dump(2));
const auto method_it = body.find("method");
if (method_it == body.end() || !method_it->is_string()) {
OXEN_LOG(debug, "Bad client request: no method field");
cb(Response{Status::BAD_REQUEST, "invalid json: no `method` field\n"});
}
const auto& method_name = method_it->get_ref<const std::string&>();
OXEN_LOG(trace, " - method name: {}", method_name);
const auto params_it = body.find("params");
if (params_it == body.end() || !params_it->is_object()) {
OXEN_LOG(debug, "Bad client request: no params field");
cb(Response{Status::BAD_REQUEST, "invalid json: no `params` field\n"});
}
if (method_name == "store") {
OXEN_LOG(debug, "Process client request: store");
cb(this->process_store(*params_it));
} else if (method_name == "retrieve") {
OXEN_LOG(debug, "Process client request: retrieve");
cb(this->process_retrieve(*params_it));
// TODO: maybe we should check if (some old) clients requests
// long-polling and then wait before responding to prevent spam
} else if (method_name == "get_snodes_for_pubkey") {
OXEN_LOG(debug, "Process client request: snodes for pubkey");
cb(this->process_snodes_by_pk(*params_it));
} else if (method_name == "oxend_request") {
OXEN_LOG(debug, "Process client request: oxend_request");
this->process_oxend_request(*params_it, std::move(cb));
} else if (method_name == "get_lns_mapping") {
const auto name_it = params_it->find("name_hash");
if (name_it == params_it->end()) {
cb(Response{Status::BAD_REQUEST, "Field <name_hash> is missing"});
} else {
this->process_lns_request(*name_it, std::move(cb));
}
} else {
OXEN_LOG(debug, "Bad client request: unknown method '{}'", method_name);
cb(Response{Status::BAD_REQUEST,
fmt::format("no method {}", method_name)});
}
}
Response RequestHandler::wrap_proxy_response(const Response& res,
const x25519_pubkey& client_key,
bool use_gcm) const {
nlohmann::json json_res;
json_res["status"] = res.status();
json_res["body"] = res.message();
const std::string res_body = json_res.dump();
std::string ciphertext;
if (use_gcm) {
ciphertext = oxenmq::to_base64(
channel_cipher_.encrypt_gcm(res_body, client_key));
} else {
ciphertext = oxenmq::to_base64(
channel_cipher_.encrypt_cbc(res_body, client_key));
}
// why does this have to be json???
return Response{Status::OK, std::move(ciphertext), ContentType::json};
}
void RequestHandler::process_lns_request(
std::string name_hash, std::function<void(oxen::Response)> cb) {
json params;
json array = json::array();
json entry;
entry["name_hash"] = std::move(name_hash);
json types = json::array();
types.push_back(0);
entry["types"] = types;
array.push_back(entry);
params["entries"] = array;
#ifdef INTEGRATION_TEST
// use mainnet seed
oxend_json_rpc_request(
ioc_, "public.loki.foundation", 22023, "lns_names_to_owners", params,
[cb = std::move(cb)](sn_response_t sn) {
if (sn.error_code == SNodeError::NO_ERROR && sn.body)
cb({Status::OK, *sn.body});
else
cb({Status::BAD_REQUEST, "unknown oxend error"});
});
#else
service_node_.omq_server().oxend_request(
"rpc.lns_names_to_owners",
[cb = std::move(cb)](bool success, auto&& data) {
if (success && !data.empty())
cb({Status::OK, data.front()});
else
cb({Status::BAD_REQUEST, "unknown oxend error"});
});
#endif
}
void RequestHandler::process_onion_exit(
const x25519_pubkey& eph_key, const std::string& body,
std::function<void(oxen::Response)> cb) {
OXEN_LOG(debug, "Processing onion exit!");
if (!service_node_.snode_ready()) {
cb({Status::SERVICE_UNAVAILABLE, "Snode not ready"});
return;
}
this->process_client_req(body, std::move(cb));
}
void RequestHandler::process_proxy_exit(
const x25519_pubkey& client_key,
std::string_view payload,
std::function<void(oxen::Response)> cb) {
if (!service_node_.snode_ready()) {
auto res = Response{Status::SERVICE_UNAVAILABLE, "Snode not ready"};
cb(wrap_proxy_response(res, client_key, false));
return;
}
static int proxy_idx = 0;
int idx = proxy_idx++;
OXEN_LOG(debug, "[{}] Process proxy exit", idx);
std::string plaintext;
try {
plaintext = channel_cipher_.decrypt_cbc(payload, client_key);
} catch (const std::exception& e) {
auto msg = fmt::format("Invalid ciphertext: {}", e.what());
OXEN_LOG(debug, "{}", msg);
auto res = Response{Status::BAD_REQUEST, std::move(msg)};
// TODO: since we always seem to encrypt the response, we should
// do it once one level above instead
cb(wrap_proxy_response(res, client_key, false));
return;
}
std::string body;
bool lp_used = false;
try {
const json req = json::parse(plaintext, nullptr, true);
body = req.at("body").get<std::string>();
if (req.find("headers") != req.end()) {
if (req.at("headers").find(OXEN_LONG_POLL_HEADER) !=
req.at("headers").end()) {
lp_used =
req.at("headers").at(OXEN_LONG_POLL_HEADER).get<bool>();
}
}
} catch (std::exception& e) {
auto msg = fmt::format("JSON parsing error: {}", e.what());
OXEN_LOG(debug, "[{}] {}", idx, msg);
auto res = Response{Status::BAD_REQUEST, msg};
cb(wrap_proxy_response(res, client_key, false /* use cbc */));
return;
}
if (lp_used) {
OXEN_LOG(debug, "Long polling requested over a proxy request");
}
this->process_client_req(
body, [this, cb = std::move(cb), client_key, idx](oxen::Response res) {
OXEN_LOG(debug, "[{}] proxy about to respond with: {}", idx,
res.status());
cb(wrap_proxy_response(res, client_key, false /* use cbc */));
});
}
void RequestHandler::process_onion_to_url(
const std::string& protocol, const std::string& host, uint16_t port,
const std::string& target, const std::string& payload,
std::function<void(oxen::Response)> cb) {
// TODO: investigate if the use of a shared pointer is necessary
auto req = std::make_shared<request_t>();
req->body() = payload;
req->set(http::field::host, host);
req->method(http::verb::post);
req->target(target);
req->prepare_payload();
// `cb` needs to be adapted for http request
auto http_cb = [cb = std::move(cb)](sn_response_t res) {
if (res.error_code == SNodeError::NO_ERROR) {
cb(oxen::Response{Status::OK, *res.body});
} else {
OXEN_LOG(debug, "Oxen server error: {}", res.error_code);
cb(oxen::Response{Status::BAD_REQUEST, "Oxen Server error"});
}
};
if (protocol != "https") {
make_http_request(ioc_, host, port, req, http_cb);
} else {
make_https_request(ioc_, host, port, req, http_cb);
}
}
} // namespace oxen