Initial lokimq integration. Onion requests to SS.

This commit is contained in:
Maxim Shishmarev 2020-03-06 17:27:15 +11:00 committed by Maxim Shishmarev
parent 4909d206b9
commit 330581b0d4
24 changed files with 1520 additions and 719 deletions

3
.gitmodules vendored
View file

@ -1,3 +1,6 @@
[submodule "vendors/spdlog"]
path = vendors/spdlog
url = https://github.com/gabime/spdlog.git
[submodule "vendors/loki-mq"]
path = vendors/loki-mq
url = git@github.com:loki-project/loki-mq.git

View file

@ -24,6 +24,14 @@ if (DISABLE_SNODE_SIGNATURE)
add_definitions(-DDISABLE_SNODE_SIGNATURE)
endif()
list (APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/contrib")
set(sodium_USE_STATIC_LIBS ON)
find_package(sodium REQUIRED)
include_directories(${sodium_INCLUDE_DIR})
include_directories("${CMAKE_CURRENT_LIST_DIR}/vendors/loki-mq/cppzmq")
include_directories("${CMAKE_CURRENT_LIST_DIR}/vendors/loki-mq/mapbox-variant/include")
loki_add_subdirectory(common)
loki_add_subdirectory(utils)
loki_add_subdirectory(crypto)
@ -32,7 +40,6 @@ loki_add_subdirectory(storage)
loki_add_subdirectory(httpserver)
loki_add_subdirectory(vendors/spdlog)
if (BUILD_TESTS)
loki_add_subdirectory(unit_test)
endif ()

View file

@ -19,13 +19,15 @@ struct sn_record_t {
private:
uint16_t port_;
std::string sn_address_; // Snode address (pubkey plus .snode, was used for lokinet)
// TODO: create separate types for different encodings of pubkeys,
// so if we confuse them, it will be a compiler error
std::string sn_address_; // Snode address (pubkey plus .snode, was used for lokinet)
std::string pub_key_base_32z_; // We don't need this! (esp. since it is legacy key)
std::string pubkey_x25519_hex_;
std::string pubkey_ed25519_hex_;
std::string pub_key_hex_; // Monero legacy key
// Required by LokiMQ
std::string pubkey_x25519_bin_;
std::string ip_; // Snode ip
@ -43,9 +45,11 @@ struct sn_record_t {
public:
sn_record_t(uint16_t port, const std::string& address,
const std::string& pk_hex, const std::string& pk_x25519,
const std::string& pk_ed25519, const std::string& ip)
const std::string& pk_x25519_bin, const std::string& pk_ed25519,
const std::string& ip)
: port_(port), pub_key_hex_(pk_hex), pubkey_x25519_hex_(pk_x25519),
pubkey_ed25519_hex_(pk_ed25519), ip_(ip) {
pubkey_x25519_bin_(pk_x25519_bin), pubkey_ed25519_hex_(pk_ed25519),
ip_(ip) {
set_address(address);
}
@ -60,6 +64,7 @@ struct sn_record_t {
const std::string& pub_key_hex() const { return pub_key_hex_; }
const std::string& pubkey_x25519_hex() const { return pubkey_x25519_hex_; }
const std::string& pubkey_ed25519_hex() const { return pubkey_ed25519_hex_; }
const std::string& pubkey_x25519_bin() const { return pubkey_x25519_bin_; }
const std::string& ip() const { return ip_; }
template <typename OStream>

View file

@ -51,10 +51,6 @@ target_link_libraries(crypto PRIVATE ${Boost_LIBRARIES})
add_executable(crypto_test src/test_main.cpp)
target_link_libraries(crypto_test PRIVATE crypto)
# sodium
list (APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/../contrib")
set(sodium_USE_STATIC_LIBS ON)
find_package(sodium REQUIRED)
target_link_libraries(crypto PUBLIC sodium)
if (UNIX AND NOT APPLE)

View file

@ -4,18 +4,21 @@
#include <string>
#include <vector>
// Why is this even a template??
template <typename T>
class ChannelEncryption {
public:
ChannelEncryption(const std::vector<uint8_t>& private_key);
~ChannelEncryption() = default;
T encrypt(const T& plainText, const std::string& pubKey) const;
T encrypt_cbc(const T& plainText, const std::string& pubKey) const;
T decrypt(const T& cipherText, const std::string& pubKey) const;
T encrypt_gcm(const T& plainText, const std::string& pubKey) const;
T decrypt_cbc(const T& cipherText, const std::string& pubKey) const;
T decrypt_gcm(const T& cipherText, const std::string& pubKey) const;
private:
std::vector<uint8_t>
calculateSharedSecret(const std::vector<uint8_t>& pubKey) const;
const std::vector<uint8_t> private_key_;
};

View file

@ -22,6 +22,8 @@ struct lokid_key_pair_t {
public_key_t public_key;
};
std::string key_to_string(const std::array<uint8_t, loki::KEY_LENGTH>& key);
private_key_t lokidKeyFromHex(const std::string& private_key_hex);
public_key_t derive_pubkey_legacy(const private_key_t& private_key);

View file

@ -5,9 +5,13 @@
#include <openssl/rand.h>
#include <sodium.h>
#include "utils.hpp"
#include <exception>
#include <string>
#include <iostream>
std::vector<uint8_t> hexToBytes(const std::string& hex) {
std::vector<uint8_t> temp;
boost::algorithm::unhex(hex, std::back_inserter(temp));
@ -18,26 +22,49 @@ template <typename T>
ChannelEncryption<T>::ChannelEncryption(const std::vector<uint8_t>& private_key)
: private_key_(private_key) {}
template <typename T>
std::vector<uint8_t> ChannelEncryption<T>::calculateSharedSecret(
const std::vector<uint8_t>& pubKey) const {
std::vector<uint8_t> sharedSecret(crypto_scalarmult_BYTES);
if (pubKey.size() != crypto_scalarmult_curve25519_BYTES) {
// Derive shared secret from our (ephemeral) `seckey` and the other party's `pubkey`
static std::vector<uint8_t>
calculate_shared_secret(const std::vector<uint8_t>& seckey,
const std::vector<uint8_t>& pubkey) {
std::vector<uint8_t> secret(crypto_scalarmult_BYTES);
if (pubkey.size() != crypto_scalarmult_curve25519_BYTES) {
throw std::runtime_error("Bad pubKey size");
}
if (crypto_scalarmult(sharedSecret.data(), this->private_key_.data(),
pubKey.data()) != 0) {
if (crypto_scalarmult(secret.data(), seckey.data(), pubkey.data()) != 0) {
throw std::runtime_error(
"Shared key derivation failed (crypto_scalarmult)");
}
return sharedSecret;
return secret;
}
static std::vector<uint8_t>
derive_symmetric_key(const std::vector<uint8_t>& seckey,
const std::vector<uint8_t>& pubkey) {
const std::vector<uint8_t> sharedKey = calculate_shared_secret(seckey, pubkey);
std::vector<uint8_t> derived_key(32);
const std::string salt_str = "LOKI";
const auto salt = reinterpret_cast<const unsigned char*>(salt_str.data());
crypto_auth_hmacsha256_state state;
crypto_auth_hmacsha256_init(&state, salt, salt_str.size());
crypto_auth_hmacsha256_update(&state, sharedKey.data(), sharedKey.size());
crypto_auth_hmacsha256_final(&state, derived_key.data());
return derived_key;
}
template <typename T>
T ChannelEncryption<T>::encrypt(const T& plaintext,
const std::string& pubKey) const {
T ChannelEncryption<T>::encrypt_cbc(const T& plaintext,
const std::string& pubKey) const {
const std::vector<uint8_t> pubKeyBytes = hexToBytes(pubKey);
const std::vector<uint8_t> sharedKey = calculateSharedSecret(pubKeyBytes);
const std::vector<uint8_t> sharedKey =
calculate_shared_secret(this->private_key_, pubKeyBytes);
// Initialise cipher
const EVP_CIPHER* cipher = EVP_aes_256_cbc();
@ -90,10 +117,76 @@ T ChannelEncryption<T>::encrypt(const T& plaintext,
}
template <typename T>
T ChannelEncryption<T>::decrypt(const T& ciphertextAndIV,
const std::string& pubKey) const {
T ChannelEncryption<T>::encrypt_gcm(const T& plaintext,
const std::string& pubKey) const {
const std::vector<uint8_t> pubKeyBytes = hexToBytes(pubKey);
const std::vector<uint8_t> sharedKey = calculateSharedSecret(pubKeyBytes);
const std::vector<uint8_t> derived_key = derive_symmetric_key(this->private_key_, pubKeyBytes);
T ciphertext;
// Ciphertext should always be the length of plaintext plus tag
ciphertext.resize(plaintext.size() + 16);
auto ciphertext_ptr = reinterpret_cast<unsigned char*>(&ciphertext[0]);
unsigned long long ciphertext_len;
const auto plaintext_ptr = reinterpret_cast<const unsigned char*>(&plaintext[0]);
unsigned char nonce[crypto_aead_aes256gcm_NPUBBYTES];
randombytes_buf(nonce, sizeof(nonce));
crypto_aead_aes256gcm_encrypt(ciphertext_ptr, &ciphertext_len, plaintext_ptr,
plaintext.size(), NULL, 0, NULL, nonce,
derived_key.data());
ciphertext.resize(ciphertext_len);
ciphertext.insert(ciphertext.begin(), std::begin(nonce), std::end(nonce));
// nonce (12 bytes) || ciphertext || tag (16 bytes)
return ciphertext;
}
template <typename T>
T ChannelEncryption<T>::decrypt_gcm(const T& iv_ciphertext_tag,
const std::string& pubKey) const {
const std::vector<uint8_t> pubKeyBytes = hexToBytes(pubKey);
const std::vector<uint8_t> derived_key = derive_symmetric_key(this->private_key_, pubKeyBytes);
T output;
// Plaintext should be (16 + 12) bytes shorter
output.resize(iv_ciphertext_tag.size() - 28);
auto outPtr = reinterpret_cast<unsigned char*>(&output[0]);
unsigned long long decrypted_len;
constexpr auto NONCE_SIZE = 12;
const auto ciphertext =
reinterpret_cast<const unsigned char*>(&iv_ciphertext_tag[0] + NONCE_SIZE);
const auto nonce =
reinterpret_cast<const unsigned char*>(&iv_ciphertext_tag[0]);
unsigned long long clen = iv_ciphertext_tag.size() - NONCE_SIZE;
if (crypto_aead_aes256gcm_decrypt(
outPtr, &decrypted_len, NULL /* must be null */, ciphertext, clen,
NULL, 0, nonce, derived_key.data()) != 0) {
throw std::runtime_error("Could not decrypt (AES-GCM)");
}
assert(output.size() == decrypted_len);
return output;
}
template <typename T>
T ChannelEncryption<T>::decrypt_cbc(const T& ciphertextAndIV,
const std::string& pubKey) const {
const std::vector<uint8_t> pubKeyBytes = hexToBytes(pubKey);
const std::vector<uint8_t> sharedKey = calculate_shared_secret(this->private_key_, pubKeyBytes);
// Initialise cipher
const EVP_CIPHER* cipher = EVP_aes_256_cbc();
@ -121,7 +214,7 @@ T ChannelEncryption<T>::decrypt(const T& ciphertextAndIV,
// Decrypt every full blocks
if (EVP_DecryptUpdate(ctx, outPtr, &len, inPtr + ivLength,
ciphertextLength) <= 0) {
throw std::runtime_error("Could not initialise decryption context");
throw std::runtime_error("Could not decrypt block");
}
plaintextLength += len;
@ -134,6 +227,7 @@ T ChannelEncryption<T>::decrypt(const T& ciphertextAndIV,
// Remove excess bytes
output.resize(plaintextLength);
// Don't we need to call free even when we throw??
EVP_CIPHER_CTX_free(ctx);
return output;
}

View file

@ -72,4 +72,9 @@ public_key_t derive_pubkey_ed25519(const private_key_ed25519_t& seckey) {
return pubkey;
}
std::string key_to_string(const std::array<uint8_t, loki::KEY_LENGTH>& key) {
auto pk = reinterpret_cast<const char*>(&key);
return std::string{pk, loki::KEY_LENGTH};
}
} // namespace loki

View file

@ -19,6 +19,8 @@ set(HEADER_FILES
net_stats.h
dns_text_records.h
reachability_testing.h
lmq_server.h
request_handler.h
)
set(SRC_FILES
@ -34,6 +36,8 @@ set(SRC_FILES
command_line.cpp
dns_text_records.cpp
reachability_testing.cpp
lmq_server.cpp
request_handler.cpp
)
add_library(httpserver_lib STATIC ${HEADER_FILES} ${SRC_FILES})
@ -43,6 +47,7 @@ loki_add_subdirectory(../storage storage)
loki_add_subdirectory(../utils utils)
loki_add_subdirectory(../pow pow)
loki_add_subdirectory(../crypto crypto)
loki_add_subdirectory(../vendors/loki-mq loki-mq)
find_package(OpenSSL REQUIRED)
find_package(Boost
@ -60,10 +65,12 @@ target_link_libraries(httpserver_lib PUBLIC utils)
target_link_libraries(httpserver_lib PUBLIC pow)
target_link_libraries(httpserver_lib PUBLIC resolv)
target_link_libraries(httpserver_lib PUBLIC crypto)
target_link_libraries(httpserver_lib PUBLIC lokimq)
set_property(TARGET httpserver_lib PROPERTY CXX_STANDARD 14)
target_include_directories(httpserver_lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${Boost_INCLUDE_DIRS})
target_include_directories(httpserver_lib PUBLIC ../vendors/loki-mq/lokimq)
target_link_libraries(httpserver_lib PUBLIC ${Boost_LIBRARIES})
set(BIN_NAME loki-storage)

View file

@ -25,6 +25,7 @@ void command_line_parser::parse_args(int argc, char* argv[]) {
("log-level", po::value(&options_.log_level), "Log verbosity level, see Log Levels below for accepted values")
("lokid-rpc-ip", po::value(&options_.lokid_rpc_port), "RPC IP on which the local Loki daemon is listening (usually localhost)")
("lokid-rpc-port", po::value(&options_.lokid_rpc_port), "RPC port on which the local Loki daemon is listening")
("lmq-port", po::value(&options_.lmq_port), "Port used by LokiMQ")
("testnet", po::bool_switch(&options_.testnet), "Start storage server in testnet mode")
("force-start", po::bool_switch(&options_.force_start), "Ignore the initialisation ready check")
("bind-ip", po::value(&options_.ip)->default_value("0.0.0.0"), "IP to which to bind the server")
@ -78,6 +79,10 @@ void command_line_parser::parse_args(int argc, char* argv[]) {
options_.lokid_rpc_port = 38157;
}
if (!vm.count("lmq-port")) {
throw std::runtime_error("lmq-port command line option is not specified");
}
if (!vm.count("ip") || !vm.count("port")) {
throw std::runtime_error(
"Invalid option: address and/or port missing.");

View file

@ -9,6 +9,7 @@ struct command_line_options {
uint16_t port;
std::string lokid_rpc_ip = "127.0.0.1";
uint16_t lokid_rpc_port = 22023; // Or 38157 if `testnet`
uint16_t lmq_port;
bool force_start = false;
bool print_version = false;
bool print_help = false;

View file

@ -1,7 +1,7 @@
#include "http_connection.h"
#include "Database.hpp"
#include "Item.hpp"
#include "channel_encryption.hpp"
#include "dev_sink.h"
#include "net_stats.h"
#include "rate_limiter.h"
@ -15,6 +15,8 @@
// needed for proxy requests
#include "https_client.h"
#include "request_handler.h"
#include <cstdlib>
#include <ctime>
#include <functional>
@ -48,14 +50,6 @@ namespace loki {
constexpr auto TEST_RETRY_PERIOD = std::chrono::milliseconds(50);
// Note: on the client side the limit is different
// as it is not encrypted/encoded there yet.
// The choice is somewhat arbitrary but it roughly
// corresponds to the client-side limit of 2000 chars
// of unencrypted message body in our experiments
// (rounded up)
constexpr size_t MAX_MESSAGE_BODY = 102400; // 100 KB limit;
std::shared_ptr<request_t> build_post_request(const char* target,
std::string&& data) {
auto req = std::make_shared<request_t>();
@ -204,8 +198,7 @@ namespace http_server {
static void
accept_connection(boost::asio::io_context& ioc,
boost::asio::ssl::context& ssl_ctx, tcp::acceptor& acceptor,
ServiceNode& sn,
ChannelEncryption<std::string>& channel_encryption,
ServiceNode& sn, RequestHandler& rh,
RateLimiter& rate_limiter, const Security& security) {
static boost::asio::steady_timer acceptor_timer(ioc);
@ -216,12 +209,11 @@ accept_connection(boost::asio::io_context& ioc,
if (!ec) {
std::make_shared<connection_t>(ioc, ssl_ctx, std::move(socket), sn,
channel_encryption, rate_limiter,
security)
rh, rate_limiter, security)
->start();
accept_connection(ioc, ssl_ctx, acceptor, sn, channel_encryption,
rate_limiter, security);
accept_connection(ioc, ssl_ctx, acceptor, sn, rh, rate_limiter,
security);
} else {
// TODO: remove this once we confirmed that there is
@ -248,16 +240,15 @@ accept_connection(boost::asio::io_context& ioc,
abort();
}
accept_connection(ioc, ssl_ctx, acceptor, sn,
channel_encryption, rate_limiter, security);
accept_connection(ioc, ssl_ctx, acceptor, sn, rh, rate_limiter,
security);
});
}
});
}
void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,
const boost::filesystem::path& base_path, ServiceNode& sn,
ChannelEncryption<std::string>& channel_encryption,
const boost::filesystem::path& base_path, ServiceNode& sn, RequestHandler& rh,
RateLimiter& rate_limiter, Security& security) {
LOKI_LOG(trace, "http server run");
@ -273,8 +264,7 @@ void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,
security.generate_cert_signature();
accept_connection(ioc, ssl_ctx, acceptor, sn, channel_encryption,
rate_limiter, security);
accept_connection(ioc, ssl_ctx, acceptor, sn, rh, rate_limiter, security);
ioc.run();
}
@ -283,12 +273,11 @@ void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,
connection_t::connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
tcp::socket socket, ServiceNode& sn,
ChannelEncryption<std::string>& channel_encryption,
RateLimiter& rate_limiter, const Security& security)
RequestHandler& rh, RateLimiter& rate_limiter,
const Security& security)
: ioc_(ioc), ssl_ctx_(ssl_ctx), socket_(std::move(socket)),
stream_(socket_, ssl_ctx_), service_node_(sn),
channel_cipher_(channel_encryption), rate_limiter_(rate_limiter),
repeat_timer_(ioc),
stream_(socket_, ssl_ctx_), service_node_(sn), request_handler_(rh),
rate_limiter_(rate_limiter), repeat_timer_(ioc),
deadline_(ioc, SESSION_TIME_LIMIT), notification_ctx_{boost::none},
security_(security) {
@ -543,6 +532,60 @@ static void print_headers(const request_t& req) {
}
}
void connection_t::process_onion_req() {
LOKI_LOG(debug, "Processing an onion request");
const request_t& req = this->request_.get();
// We are not expecting any headers, all parameters are in json body
// Need to make sure we are not blocking waiting for the response
delay_response_ = true;
auto on_response = [this](loki::Response res) {
LOKI_LOG(debug, "Got an onion response as guard node");
if (res.status() == Status::OK) {
response_.result(http::status::ok);
// OK here simply means that the response we got is
// coming from the target node as opposed to any other
// node on the path. The encrypted body will contain
// its own response status.
this->body_stream_ << res.message();
} else {
// res.status() is for us, we only report a generic
// error to indicate onion request failure
LOKI_LOG(debug, "Would send 503 error");
response_.result(http::status::service_unavailable);
}
this->write_response();
};
try {
const json json_req = json::parse(req.body(), nullptr, true);
// base64
const auto& ciphertext = json_req.at("ciphertext").get_ref<const std::string&>();
// hex
const auto& ephem_key = json_req.at("ephemeral_key").get_ref<const std::string&>();
request_handler_.process_onion_req(ciphertext, ephem_key, on_response);
} catch (const std::exception& e) {
auto msg = fmt::format("Error parsing outer JSON in onion request: {}", e.what());
LOKI_LOG(error, "{}", msg);
response_.result(http::status::bad_request);
this->body_stream_ << std::move(msg);
}
}
void connection_t::process_proxy_req() {
LOKI_LOG(debug, "Processing proxy request: we are first hop");
@ -564,14 +607,49 @@ void connection_t::process_proxy_req() {
const auto& sender_key = header_[LOKI_SENDER_KEY_HEADER];
const auto& target_snode_key = header_[LOKI_TARGET_SNODE_KEY];
service_node_.process_proxy_req(req.body(), sender_key, target_snode_key, [this] (sn_response_t res) {
auto sn = service_node_.find_node_by_ed25519_pk(target_snode_key);
if (res.raw_response) {
this->response_ = *res.raw_response;
static int req_counter = 0;
const int req_idx = req_counter;
// TODO: make an https response out of what we got back
auto on_proxy_response = [this, req_idx] (bool success, std::vector<std::string> data) {
LOKI_LOG(debug, "on proxy response: {}", success ? "success" : "failure");
if (success && data.size() == 1) {
LOKI_LOG(debug, "PROXY RESPONSE OK, idx: {}", req_idx);
this->body_stream_ << data[0];
response_.result(http::status::ok);
} else {
LOKI_LOG(debug, "PROXY RESPONSE FAILED, idx: {}", req_idx);
}
// This will return an empty, but failed response to the client
// if the raw_response is empty (we should provide better errors)
this->write_response();
});
};
if (!sn) {
LOKI_LOG(debug, "Could not find target snode for proxy: {}", target_snode_key);
on_proxy_response(false, {});
return;
}
LOKI_LOG(debug, "Target Snode: {}", target_snode_key);
// Send this request to SN over either HTTP or LOKIMQ
auto sn_req = ss_client::Request{req.body(), {{LOKI_SENDER_KEY_HEADER, sender_key}}};
LOKI_LOG(debug, "About to send a proxy exit requst, idx: {}", req_counter);
req_counter += 1;
service_node_.send_to_sn(*sn, ss_client::ReqMethod::PROXY_EXIT,
std::move(sn_req), on_proxy_response);
}
void connection_t::process_file_proxy_req() {
@ -681,7 +759,7 @@ void connection_t::process_swarm_req(boost::string_view target) {
/// Set to "bad request" by default
response_.result(http::status::bad_request);
LOKI_LOG(debug, "Got storage test request");
LOKI_LOG(trace, "Got storage test request");
using nlohmann::json;
@ -759,86 +837,47 @@ void connection_t::process_swarm_req(boost::string_view target) {
}
} else if (target == "/swarms/ping_test/v1") {
LOKI_LOG(debug, "Received ping_test");
response_.result(http::status::ok);
} else if (target == "/swarms/push/v1") {
LOKI_LOG(trace, "swarms/push");
/// NOTE:: we only expect one message here, but
/// for now lets reuse the function we already have
std::vector<message_t> messages = deserialize_messages(req.body());
assert(messages.size() == 1);
service_node_.process_push(messages.front());
LOKI_LOG(trace, "Received ping_test");
response_.result(http::status::ok);
} else if (target == "/swarms/proxy_exit") {
LOKI_LOG(debug, "Processing proxy request: we are the destination node");
#ifdef INTEGRATION_TEST
// print_headers(req);
#endif
const auto it = req.find(LOKI_SENDER_KEY_HEADER);
/// TODO: handle the error better?
if (it != req.end()) {
const std::string key = {it->value().data(), it->value().size()};
const auto plaintext = this->channel_cipher_.decrypt(req.body(), key);
try {
const json req = json::parse(plaintext, nullptr, true);
const auto body = req.at("body").get<std::string>();
this->response_modifier_ = [this, key](response_t& res) {
nlohmann::json json_res;
json_res["status"] = res.result_int();
json_res["body"] = res.body();
nlohmann::json headers;
for (const auto &field: res) {
std::string name = field.name_string().to_string();
headers[std::move(name)] = field.value();
}
json_res["headers"] = headers;
const std::string res_body = json_res.dump();
res.body() = util::base64_encode(this->channel_cipher_.encrypt(res_body, key));
res.result(http::status::ok);
};
LOKI_LOG(trace, "CLIENT HEADERS: \n\t{}", req.at("headers").dump(2));
// TODO: copy all other headers from decrypted body to header_?
const auto headers_it = req.find("headers");
if (headers_it != req.end()) {
const auto long_poll_it = headers_it->find(LOKI_LONG_POLL_HEADER);
if (long_poll_it != headers_it->end()) {
const bool val = long_poll_it->get<bool>();
LOKI_LOG(info, "field: {}: {}", LOKI_LONG_POLL_HEADER, val);
this->header_.insert({LOKI_LONG_POLL_HEADER, val ? "true" : "false"});
}
}
this->process_client_req(body);
} catch (std::exception& e) {
LOKI_LOG(error, "JSON parsing error: {}", e.what());
}
auto res = request_handler_.process_proxy_exit(key, req.body());
this->set_response(res);
} else {
LOKI_LOG(debug, "Error: {} header is missing", LOKI_SENDER_KEY_HEADER);
}
}
}
void connection_t::set_response(const Response& res) {
response_.result(static_cast<unsigned int>(res.status()));
std::string content_type;
switch (res.content_type()) {
case ContentType::plaintext:
content_type = "text/plain";
break;
case ContentType::json:
content_type = "application/json";
break;
default:
LOKI_LOG(critical, "Unrecognized content type");
}
response_.set(http::field::content_type, content_type);
body_stream_ << res.message();
}
// Determine what needs to be done with the request message.
void connection_t::process_request() {
@ -855,6 +894,13 @@ void connection_t::process_request() {
response_.result(http::status::internal_server_error);
const auto target = req.target();
const bool is_swarm_req = (target.find("/swarms/") == 0);
if (is_swarm_req) {
LOKI_LOG(debug, "Processing a swarm request: {}", target);
}
switch (req.method()) {
case http::verb::post: {
std::string reason;
@ -868,6 +914,7 @@ void connection_t::process_request() {
LOKI_LOG(debug,
"Ignoring post request; storage server not ready: {}",
reason);
LOKI_LOG(debug, "Would send 503 error (2)");
response_.result(http::status::service_unavailable);
body_stream_ << fmt::format("Service node is not ready: {}\n",
reason);
@ -889,29 +936,21 @@ void connection_t::process_request() {
e.what());
}
// TODO: parse target (once) to determine if it is a "swarms" call
} else if (target == "/swarms/push/v1") {
} else if (is_swarm_req) {
this->process_swarm_req(target);
} else if (target == "/swarms/push_batch/v1") {
this->process_swarm_req(target);
} else if (target == "/swarms/storage_test/v1") {
this->process_swarm_req(target);
} else if (target == "/swarms/blockchain_test/v1") {
this->process_swarm_req(target);
} else if (target == "/proxy") {
this->process_proxy_req();
} else if (target == "/onion_req") {
this->process_onion_req();
} else if (target == "/file_proxy") {
this->process_file_proxy_req();
} else if (target == "/swarms/proxy_exit") {
this->process_swarm_req(target);
}
#ifdef INTEGRATION_TEST
else if (target == "/retrieve_all") {
process_retrieve_all();
const auto res = request_handler_.process_retrieve_all();
this->set_response(res);
} else if (target == "/quit") {
LOKI_LOG(info, "POST /quit");
// a bit of a hack: sending response manually
@ -955,37 +994,10 @@ void connection_t::process_request() {
}
}
static std::string obfuscate_pubkey(const std::string& pk) {
std::string res = pk.substr(0, 2);
res += "...";
res += pk.substr(pk.length() - 3, pk.length() - 1);
return res;
}
// Asynchronously transmit the response message.
void connection_t::write_response() {
#ifndef DISABLE_ENCRYPTION
const auto it = header_.find(LOKI_EPHEMKEY_HEADER);
// TODO: do we need to separately handle the case where we can't find the
// key?
if (it != header_.end()) {
const std::string& ephemKey = it->second;
try {
auto body = channel_cipher_.encrypt(body_stream_.str(), ephemKey);
response_.body() = boost::beast::detail::base64_encode(body);
response_.set(http::field::content_type, "text/plain");
} catch (const std::exception& e) {
response_.result(http::status::internal_server_error);
response_.set(http::field::content_type, "text/plain");
body_stream_ << "Could not encrypt/encode response: ";
body_stream_ << e.what() << "\n";
LOKI_LOG(critical,
"Internal Server Error. Could not encrypt response for {}",
obfuscate_pubkey(ephemKey));
}
}
#else
LOKI_LOG(trace, "write response, {} bytes", response_.body().size());
const std::string body_stream = body_stream_.str();
@ -998,8 +1010,6 @@ void connection_t::write_response() {
response_.body() = body_stream_.str();
}
#endif
// Our last change to change the response before we start sending
if (this->response_modifier_) {
(*this->response_modifier_)(response_);
@ -1037,391 +1047,10 @@ bool connection_t::parse_header(const char* first, Args... args) {
return parse_header(first) && parse_header(args...);
}
json snodes_to_json(const std::vector<sn_record_t>& snodes) {
json res_body;
json snodes_json = json::array();
for (const auto& sn : snodes) {
json snode;
snode["address"] = sn.sn_address();
snode["pubkey_x25519"] = sn.pubkey_x25519_hex();
snode["pubkey_ed25519"] = sn.pubkey_ed25519_hex();
snode["port"] = std::to_string(sn.port());
snode["ip"] = sn.ip();
snodes_json.push_back(snode);
}
res_body["snodes"] = snodes_json;
return res_body;
}
void connection_t::process_store(const json& params) {
constexpr const char* fields[] = {"pubKey", "ttl", "nonce", "timestamp",
"data"};
for (const auto& field : fields) {
if (!params.contains(field)) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("invalid json: no `{}` field\n", field);
LOKI_LOG(debug, "Bad client request: no `{}` field", field);
return;
}
}
const auto ttl = params["ttl"].get<std::string>();
const auto nonce = params["nonce"].get<std::string>();
const auto timestamp = params["timestamp"].get<std::string>();
const auto data = params["data"].get<std::string>();
LOKI_LOG(trace, "Storing message: {}", data);
bool created;
auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), created);
if (!created) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
LOKI_LOG(error, "Pubkey must be {} characters long", get_user_pubkey_size());
return;
}
if (data.size() > MAX_MESSAGE_BODY) {
response_.result(http::status::bad_request);
body_stream_ << "Message body exceeds maximum allowed length of "
<< MAX_MESSAGE_BODY << "\n";
LOKI_LOG(debug, "Message body too long: {}", data.size());
return;
}
if (!service_node_.is_pubkey_for_us(pk)) {
handle_wrong_swarm(pk);
return;
}
#ifdef INTEGRATION_TEST
LOKI_LOG(trace, "store body: ", data);
#endif
uint64_t ttlInt;
if (!util::parseTTL(ttl, ttlInt)) {
response_.result(http::status::forbidden);
response_.set(http::field::content_type, "text/plain");
body_stream_ << "Provided TTL is not valid.\n";
LOKI_LOG(debug, "Forbidden. Invalid TTL: {}", ttl);
return;
}
uint64_t timestampInt;
if (!util::parseTimestamp(timestamp, ttlInt, timestampInt)) {
response_.result(http::status::not_acceptable);
response_.set(http::field::content_type, "text/plain");
body_stream_ << "Timestamp error: check your clock\n";
LOKI_LOG(debug, "Forbidden. Invalid Timestamp: {}", timestamp);
return;
}
// Do not store message if the PoW provided is invalid
std::string messageHash;
const bool valid_pow =
checkPoW(nonce, timestamp, ttl, pk.str(), data, messageHash,
service_node_.get_curr_pow_difficulty());
#ifndef DISABLE_POW
if (!valid_pow) {
response_.result(432); // unassigned http code
response_.set(http::field::content_type, "application/json");
json res_body;
res_body["difficulty"] = service_node_.get_curr_pow_difficulty();
LOKI_LOG(debug, "Forbidden. Invalid PoW nonce: {}", nonce);
/// This might throw if not utf-8 endoded
body_stream_ << res_body.dump();
return;
}
#endif
bool success;
try {
const auto msg =
message_t{pk.str(), data, messageHash, ttlInt, timestampInt, nonce};
success = service_node_.process_store(msg);
} catch (std::exception e) {
response_.result(http::status::internal_server_error);
response_.set(http::field::content_type, "text/plain");
body_stream_ << e.what() << "\n";
LOKI_LOG(critical,
"Internal Server Error. Could not store message for {}",
obfuscate_pubkey(pk.str()));
return;
}
if (!success) {
response_.result(http::status::service_unavailable);
response_.set(http::field::content_type, "text/plain");
/// This is not the only reason for faliure
body_stream_ << "Service node is initializing\n";
LOKI_LOG(warn, "Service node is initializing");
return;
}
response_.result(http::status::ok);
response_.set(http::field::content_type, "application/json");
json res_body;
res_body["difficulty"] = service_node_.get_curr_pow_difficulty();
body_stream_ << res_body.dump();
LOKI_LOG(trace, "Successfully stored message for {}",
obfuscate_pubkey(pk.str()));
}
void connection_t::process_snodes_by_pk(const json& params) {
if (!params.contains("pubKey")) {
response_.result(http::status::bad_request);
body_stream_ << "invalid json: no `pubKey` field\n";
LOKI_LOG(debug, "Bad client request: no `pubKey` field");
return;
}
bool success;
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
if (!success) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
LOKI_LOG(debug, "Pubkey must be {} characters long ", get_user_pubkey_size());
return;
}
const std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pk);
const json res_body = snodes_to_json(nodes);
response_.result(http::status::ok);
response_.set(http::field::content_type, "application/json");
/// This might throw if not utf-8 endoded
body_stream_ << res_body.dump();
}
void connection_t::process_retrieve_all() {
std::vector<Item> all_entries;
bool res = service_node_.get_all_messages(all_entries);
if (!res) {
this->body_stream_ << "could not retrieve all entries\n";
response_.result(http::status::internal_server_error);
return;
}
json messages = json::array();
for (auto& entry : all_entries) {
json item;
item["data"] = entry.data;
item["pk"] = entry.pub_key;
messages.push_back(item);
}
json res_body;
res_body["messages"] = messages;
body_stream_ << res_body.dump();
response_.result(http::status::ok);
}
void connection_t::handle_wrong_swarm(const user_pubkey_t& pubKey) {
const std::vector<sn_record_t> nodes =
service_node_.get_snodes_by_pk(pubKey);
const json res_body = snodes_to_json(nodes);
response_.result(http::status::misdirected_request);
response_.set(http::field::content_type, "application/json");
/// This might throw if not utf-8 endoded
body_stream_ << res_body.dump();
LOKI_LOG(debug, "Client request for different swarm received");
}
constexpr auto LONG_POLL_TIMEOUT = std::chrono::milliseconds(20000);
template <typename T>
void connection_t::respond_with_messages(const std::vector<T>& items) {
json res_body;
json messages = json::array();
for (const auto& item : items) {
json message;
message["hash"] = item.hash;
/// TODO: calculate expiration time once only?
message["expiration"] = item.timestamp + item.ttl;
message["data"] = item.data;
messages.push_back(message);
}
res_body["messages"] = messages;
response_.result(http::status::ok);
response_.set(http::field::content_type, "application/json");
body_stream_ << res_body.dump();
this->write_response();
}
void connection_t::poll_db(const std::string& pk,
const std::string& last_hash) {
std::vector<Item> items;
if (!service_node_.retrieve(pk, last_hash, items)) {
response_.result(http::status::internal_server_error);
response_.set(http::field::content_type, "text/plain");
LOKI_LOG(critical,
"Internal Server Error. Could not retrieve messages for {}",
obfuscate_pubkey(pk));
return;
}
const bool lp_requested = header_.find(LOKI_LONG_POLL_HEADER) != header_.end();
if (!items.empty()) {
LOKI_LOG(trace, "Successfully retrieved messages for {}",
obfuscate_pubkey(pk));
}
if (items.empty() && lp_requested) {
auto self = shared_from_this();
// Instead of responding immediately, we delay the response
// until new data arrives for this PubKey
service_node_.register_listener(pk, self);
notification_ctx_ = notification_context_t{
boost::asio::steady_timer{ioc_}, boost::none, pk};
notification_ctx_->timer.expires_after(LONG_POLL_TIMEOUT);
notification_ctx_->timer.async_wait([=](const error_code& ec) {
if (ec == boost::asio::error::operation_aborted) {
LOKI_LOG(trace, "Notification timer manually triggered");
// we use timer cancellation as notification mechanism
std::vector<message_t> items;
auto msg = notification_ctx_->message;
if (msg) {
items.push_back(*msg);
}
respond_with_messages(items);
} else {
LOKI_LOG(trace, "Notification timer expired");
// If we are here, the notification timer expired
// with no messages ready
respond_with_messages<Item>({});
}
service_node_.remove_listener(pk, self.get());
});
} else {
respond_with_messages(items);
}
}
void connection_t::process_retrieve(const json& params) {
service_node_.all_stats_.bump_retrieve_requests();
constexpr const char* fields[] = {"pubKey", "lastHash"};
for (const auto& field : fields) {
if (!params.contains(field)) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("invalid json: no `{}` field\n", field);
LOKI_LOG(debug, "Bad client request: no `{}` field", field);
return;
}
}
bool success;
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
if (!success) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
LOKI_LOG(debug, "Pubkey must be {} characters long ", get_user_pubkey_size());
return;
}
if (!service_node_.is_pubkey_for_us(pk)) {
handle_wrong_swarm(pk);
return;
}
const auto last_hash = params["lastHash"].get<std::string>();
// we are going to send the response anynchronously
// once we have new data
delay_response_ = true;
poll_db(pk.str(), last_hash);
}
void connection_t::process_client_req(const std::string& req_json) {
const json body = json::parse(req_json, nullptr, false);
if (body == nlohmann::detail::value_t::discarded) {
response_.result(http::status::bad_request);
body_stream_ << "invalid json\n";
LOKI_LOG(debug, "Bad client request: invalid json");
return;
}
const auto method_it = body.find("method");
if (method_it == body.end() || !method_it->is_string()) {
response_.result(http::status::bad_request);
body_stream_ << "invalid json: no `method` field\n";
LOKI_LOG(debug, "Bad client request: no method field");
return;
}
const auto method_name = method_it->get<std::string>();
const auto params_it = body.find("params");
if (params_it == body.end() || !params_it->is_object()) {
response_.result(http::status::bad_request);
body_stream_ << "invalid json: no `params` field\n";
LOKI_LOG(debug, "Bad client request: no params field");
return;
}
if (method_name == "store") {
LOKI_LOG(trace, "Process client request: store, connection: {}", this->conn_idx);
this->process_store(*params_it);
} else if (method_name == "retrieve") {
LOKI_LOG(trace, "Process client request: retrieve, connection: {}", this->conn_idx);
this->process_retrieve(*params_it);
} else if (method_name == "get_snodes_for_pubkey") {
LOKI_LOG(trace, "Process client request: snodes for pubkey");
this->process_snodes_by_pk(*params_it);
} else {
response_.result(http::status::bad_request);
body_stream_ << "no method" << method_name << "\n";
LOKI_LOG(debug, "Bad client request: unknown method '{}'", method_name);
}
}
/// Move this out of `connection_t` Process client request
/// Decouple responding from http
void connection_t::process_client_req_rate_limited() {
@ -1436,30 +1065,6 @@ void connection_t::process_client_req_rate_limited() {
return;
}
#ifndef DISABLE_ENCRYPTION
// Just in case we ever plan to enable this "channel encryption",
// this part will probably be broken for proxy requests (wrong
// place to look for headers)
if (!parse_header(LOKI_EPHEMKEY_HEADER)) {
LOKI_LOG(debug, "Bad client request: could not parse headers");
return;
}
try {
const std::string decoded =
boost::beast::detail::base64_decode(plain_text);
plain_text =
channel_cipher_.decrypt(decoded, header_[LOKI_EPHEMKEY_HEADER]);
} catch (const std::exception& e) {
response_.result(http::status::bad_request);
response_.set(http::field::content_type, "text/plain");
body_stream_ << "Could not decode/decrypt body: ";
body_stream_ << e.what() << "\n";
LOKI_LOG(debug, "Bad client request: could not decrypt body");
return;
}
#endif
// Not sure what the original idea was to distinguish between headers
// in request_ and the actual header_ field, but it is useful for
// "proxy" client requests as we can have both true html headers
@ -1468,7 +1073,8 @@ void connection_t::process_client_req_rate_limited() {
header_[LOKI_LONG_POLL_HEADER] = req.at(LOKI_LONG_POLL_HEADER).to_string();
}
this->process_client_req(plain_text);
const auto res = request_handler_.process_client_req(plain_text);
this->set_response(res);
}
void connection_t::register_deadline() {

View file

@ -41,6 +41,9 @@ std::shared_ptr<request_t> build_post_request(const char* target,
struct message_t;
struct Security;
class RequestHandler;
class Response;
namespace storage {
struct Item;
}
@ -186,9 +189,10 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
// as opposed to directly after connection_t::process_request
bool delay_response_ = false;
// TODO: remove SN, only use Reqeust Handler as a mediator
ServiceNode& service_node_;
ChannelEncryption<std::string>& channel_cipher_;
RequestHandler& request_handler_;
RateLimiter& rate_limiter_;
@ -228,8 +232,7 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
public:
connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
tcp::socket socket, ServiceNode& sn,
ChannelEncryption<std::string>& channel_encryption,
tcp::socket socket, ServiceNode& sn, RequestHandler& rh,
RateLimiter& rate_limiter, const Security& security);
~connection_t();
@ -257,9 +260,6 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
/// process GET /get_logs/v1; only returns errors atm
void on_get_logs();
/// Check the database for new data, reschedule if empty
void poll_db(const std::string& pk, const std::string& last_hash);
/// Determine what needs to be done with the request message
/// (synchronously).
void process_request();
@ -267,27 +267,16 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
/// Unsubscribe listener (if any) and shutdown the connection
void clean_up();
void process_store(const nlohmann::json& params);
void process_retrieve(const nlohmann::json& params);
void process_snodes_by_pk(const nlohmann::json& params);
void process_retrieve_all();
template <typename T>
void respond_with_messages(const std::vector<T>& messages);
/// Asynchronously transmit the response message.
void write_response();
/// Syncronously (?) process client store/load requests
void process_client_req_rate_limited();
void process_client_req(const std::string& req_json);
void process_swarm_req(boost::string_view target);
void process_onion_req();
void process_proxy_req();
void process_file_proxy_req();
@ -304,21 +293,20 @@ class connection_t : public std::enable_shared_from_this<connection_t> {
const std::string& tester_pk,
bc_test_params_t params);
void set_response(const Response& res);
bool parse_header(const char* key);
template <typename... Args>
bool parse_header(const char* first, Args... args);
void handle_wrong_swarm(const user_pubkey_t& pubKey);
bool validate_snode_request();
bool verify_signature(const std::string& signature,
const std::string& public_key_b32z);
};
void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,
const boost::filesystem::path& base_path, ServiceNode& sn,
ChannelEncryption<std::string>& channelEncryption,
const boost::filesystem::path& base_path, ServiceNode& sn, RequestHandler& rh,
RateLimiter& rate_limiter, Security&);
} // namespace http_server

View file

@ -292,6 +292,11 @@ void HttpsClientSession::trigger_callback(
}
void HttpsClientSession::do_close() {
// Note: I don't think both the server and the client
// should initiate the shutdown, but I'm going to ignore
// this error as we will remove https soon
// Gracefully close the stream
stream_.async_shutdown(std::bind(&HttpsClientSession::on_shutdown,
shared_from_this(),
@ -305,7 +310,7 @@ void HttpsClientSession::on_shutdown(boost::system::error_code ec) {
ec.assign(0, ec.category());
} else if (ec) {
// This one is too noisy, so demoted to debug:
LOKI_LOG(debug, "could not shutdown stream gracefully: {} ({})",
LOKI_LOG(trace, "could not shutdown stream gracefully: {} ({})",
ec.message(), ec.value());
}

187
httpserver/lmq_server.cpp Normal file
View file

@ -0,0 +1,187 @@
#include "lmq_server.h"
#include "loki_common.h"
#include "loki_logger.h"
#include "lokid_key.h"
#include "service_node.h"
#include "request_handler.h"
#include "utils.hpp"
#include "lokimq.h"
namespace loki {
std::string LokimqServer::peer_lookup(lokimq::string_view pubkey_bin) const {
LOKI_LOG(trace, "[LMQ] Peer Lookup");
// TODO: don't create a new string here
boost::optional<sn_record_t> sn =
this->service_node_->find_node_by_x25519_bin(std::string(pubkey_bin));
if (sn) {
// TODO: need to update Lokid to include lokimq_port, for now just
// add 200 and hope it is available
return fmt::format("tcp://{}:{}", sn->ip(), sn->port() + 200);
} else {
LOKI_LOG(debug, "[LMQ] peer node not found!");
return "";
}
}
lokimq::Allow
LokimqServer::auth_level_lookup(lokimq::string_view ip,
lokimq::string_view pubkey) const {
LOKI_LOG(debug, "[LMQ] Auth Level Lookup");
// TODO: make SN accept string_view
boost::optional<sn_record_t> sn =
this->service_node_->find_node_by_x25519_bin(std::string(pubkey));
bool is_sn = sn ? true : false;
LOKI_LOG(debug, "[LMQ] is service node: {}", is_sn);
return lokimq::Allow{lokimq::AuthLevel::none, is_sn};
};
void LokimqServer::handle_sn_data(lokimq::Message& message) {
LOKI_LOG(debug, "[LMQ] handle_sn_data");
LOKI_LOG(debug, "[LMQ] thread id: {}", std::this_thread::get_id());
LOKI_LOG(debug, "[LMQ] from: {}", util::as_hex(message.conn.pubkey()));
std::stringstream ss;
// We are only expecting a single part message, so consider removing this
for (auto& part : message.data) {
ss << part;
}
// TODO: proces push batch should move to "Request handler"
service_node_->process_push_batch(ss.str());
LOKI_LOG(debug, "[LMQ] send reply");
// TODO: Investigate if the above could fail and whether we should report
// that to the sending SN
message.send_reply();
};
void LokimqServer::handle_sn_proxy_exit(lokimq::Message& message) {
LOKI_LOG(debug, "[LMQ] handle_sn_proxy_exit");
LOKI_LOG(debug, "[LMQ] thread id: {}", std::this_thread::get_id());
LOKI_LOG(debug, "[LMQ] from: {}", util::as_hex(message.conn.pubkey()));
if (message.data.size() != 2) {
LOKI_LOG(debug, "Expected 2 message parts, got {}",
message.data.size());
return;
}
const auto& client_key = message.data[0];
const auto& payload = message.data[1];
// TODO: accept string_view?
auto res = request_handler_->process_proxy_exit(std::string(client_key), std::string(payload));
if (res.status() == Status::OK) {
message.send_reply(res.message());
} else {
// TODO: better handle this (unlikely) error
LOKI_LOG(debug, "Error: status is not OK for proxy_exit");
}
}
void LokimqServer::handle_onion_request(lokimq::Message& message) {
LOKI_LOG(debug, "Got an onion request over LOKIMQ");
auto reply_tag = message.reply_tag;
auto origin_pk = message.conn.pubkey();
auto on_response = [this, origin_pk, reply_tag](loki::Response res) mutable {
LOKI_LOG(debug, "on response: {}", to_string(res));
std::string status = std::to_string(static_cast<int>(res.status()));
lokimq_->send(origin_pk, "REPLY", reply_tag, std::move(status), res.message());
};
if (message.data.size() != 2) {
LOKI_LOG(error, "Expected 2 message parts, got {}", message.data.size());
on_response(loki::Response{Status::BAD_REQUEST, "Incorrect number of messages"});
return;
}
const auto& eph_key = message.data[0];
const auto& ciphertext = message.data[1];
request_handler_->process_onion_req(std::string(ciphertext), std::string(eph_key), on_response);
}
void LokimqServer::init(ServiceNode* sn, RequestHandler* rh,
const lokid_key_pair_t& keypair, uint16_t port) {
namespace ph = std::placeholders;
using lokimq::Allow;
using lokimq::string_view;
service_node_ = sn;
request_handler_ = rh;
auto pubkey = key_to_string(keypair.public_key);
auto seckey = key_to_string(keypair.private_key);
auto logger = [](lokimq::LogLevel level, const char* file, int line,
std::string message) {
LOKI_LOG(debug, "[line: {}]: {}", line, message);
};
auto lookup_fn = std::bind(&LokimqServer::peer_lookup, this, ph::_1);
auto allow_fn =
std::bind(&LokimqServer::auth_level_lookup, this, ph::_1, ph::_2);
lokimq_.reset(new LokiMQ{pubkey,
seckey,
true /* is service node */,
lookup_fn,
logger});
LOKI_LOG(info, "LokiMQ is listenting on port {}", port);
lokimq_->add_category("sn",
lokimq::Access{lokimq::AuthLevel::none, true, false});
lokimq_->log_level(lokimq::LogLevel::warn);
// ============= COMMANDS - BEGIN =============
lokimq_->add_request_command(
"sn", "data", std::bind(&LokimqServer::handle_sn_data, this, ph::_1));
lokimq_->add_request_command(
"sn", "proxy_exit",
std::bind(&LokimqServer::handle_sn_proxy_exit, this, ph::_1));
lokimq_->add_request_command(
"sn", "onion_req",
std::bind(&LokimqServer::handle_onion_request, this, ph::_1));
// +============= COMMANDS - END ==============
lokimq_->set_general_threads(1);
lokimq_->listen_curve(fmt::format("tcp://0.0.0.0:{}", port), allow_fn);
lokimq_->start();
}
LokimqServer::LokimqServer() = default;
LokimqServer::~LokimqServer() = default;
} // namespace loki

59
httpserver/lmq_server.h Normal file
View file

@ -0,0 +1,59 @@
#pragma once
#include <cstdint>
#include <string>
namespace lokimq {
class LokiMQ;
class simple_string_view;
using string_view = simple_string_view;
struct Allow;
class Message;
} // namespace lokimq
using lokimq::LokiMQ;
namespace loki {
struct lokid_key_pair_t;
class ServiceNode;
class RequestHandler;
class LokimqServer {
std::unique_ptr<LokiMQ> lokimq_;
// Has information about current SNs
ServiceNode* service_node_;
RequestHandler* request_handler_;
// Get nodes' address
std::string peer_lookup(lokimq::string_view pubkey_bin) const;
// Check if the node is SN
lokimq::Allow auth_level_lookup(lokimq::string_view ip,
lokimq::string_view pubkey) const;
// Handle Session data coming from peer SN
void handle_sn_data(lokimq::Message& message);
// Handle Session client requests arrived via proxy
void handle_sn_proxy_exit(lokimq::Message& message);
void handle_onion_request(lokimq::Message& message);
public:
LokimqServer();
~LokimqServer();
// Initialize lokimq
void init(ServiceNode* sn, RequestHandler* rh,
const lokid_key_pair_t& keypair, uint16_t port);
// TODO: maybe we should separate LokiMQ and LokimqServer, so we don't have
// to do this: Get underlying LokiMQ instance
LokiMQ* lmq() { return lokimq_.get(); }
};
} // namespace loki

View file

@ -7,8 +7,11 @@
#include "security.h"
#include "service_node.h"
#include "swarm.h"
#include "version.h"
#include "utils.hpp"
#include "version.h"
#include "lmq_server.h"
#include "request_handler.h"
#include <boost/filesystem.hpp>
#include <sodium.h>
@ -72,9 +75,11 @@ int main(int argc, char* argv[]) {
if (options.data_dir.empty()) {
if (auto home_dir = get_home_dir()) {
if (options.testnet) {
options.data_dir = (home_dir.get() / ".loki" / "testnet" / "storage").string();
options.data_dir =
(home_dir.get() / ".loki" / "testnet" / "storage").string();
} else {
options.data_dir = (home_dir.get() / ".loki" / "storage").string();
options.data_dir =
(home_dir.get() / ".loki" / "storage").string();
}
}
}
@ -94,7 +99,8 @@ int main(int argc, char* argv[]) {
if (options.testnet) {
loki::set_testnet();
LOKI_LOG(warn, "Starting in testnet mode, make sure this is intentional!");
LOKI_LOG(warn,
"Starting in testnet mode, make sure this is intentional!");
}
// Always print version for the logs
@ -118,8 +124,12 @@ int main(int argc, char* argv[]) {
LOKI_LOG(info, "Setting log level to {}", options.log_level);
LOKI_LOG(info, "Setting database location to {}", options.data_dir);
LOKI_LOG(info, "Setting Lokid RPC to {}:{}", options.lokid_rpc_ip, options.lokid_rpc_port);
LOKI_LOG(info, "Listening at address {} port {}", options.ip, options.port);
LOKI_LOG(info, "Setting Lokid RPC to {}:{}", options.lokid_rpc_ip,
options.lokid_rpc_port);
LOKI_LOG(info, "Https server is listening at {}:{}", options.ip,
options.port);
LOKI_LOG(info, "LokiMQ is listening at {}:{}", options.ip,
options.lmq_port);
boost::asio::io_context ioc{1};
boost::asio::io_context worker_ioc{1};
@ -129,6 +139,11 @@ int main(int argc, char* argv[]) {
return EXIT_FAILURE;
}
if (crypto_aead_aes256gcm_is_available() == 0) {
LOKI_LOG(error, "AES-256-GCM is not available on this CPU");
return EXIT_FAILURE;
}
{
const auto fd_limit = util::get_fd_limit();
if (fd_limit != -1) {
@ -140,7 +155,8 @@ int main(int argc, char* argv[]) {
try {
auto lokid_client = loki::LokidClient(ioc, options.lokid_rpc_ip, options.lokid_rpc_port);
auto lokid_client = loki::LokidClient(ioc, options.lokid_rpc_ip,
options.lokid_rpc_port);
// Normally we request the key from daemon, but in integrations/swarm
// testing we are not able to do that, so we extract the key as a
@ -158,7 +174,8 @@ int main(int argc, char* argv[]) {
private_key_x25519 = loki::lokidKeyFromHex(options.lokid_x25519_key);
LOKI_LOG(info, "x25519 SECRET KEY: {}", options.lokid_x25519_key);
private_key_ed25519 = loki::private_key_ed25519_t::from_hex(options.lokid_ed25519_key);
private_key_ed25519 =
loki::private_key_ed25519_t::from_hex(options.lokid_ed25519_key);
LOKI_LOG(info, "ed25519 SECRET KEY: {}", options.lokid_ed25519_key);
#endif
@ -189,10 +206,18 @@ int main(int argc, char* argv[]) {
loki::lokid_key_pair_t lokid_key_pair_x25519{private_key_x25519,
public_key_x25519};
loki::ServiceNode service_node(ioc, worker_ioc, options.port,
lokid_key_pair, lokid_key_pair_x25519,
options.data_dir, lokid_client,
options.force_start);
loki::LokimqServer lokimq_server;
// TODO: SN doesn't need lokimq_server, just the lmq components
loki::ServiceNode service_node(
ioc, worker_ioc, options.port, lokimq_server, lokid_key_pair,
options.data_dir, lokid_client, options.force_start);
loki::RequestHandler request_handler(service_node, channel_encryption);
lokimq_server.init(&service_node, &request_handler,
lokid_key_pair_x25519, options.lmq_port);
RateLimiter rate_limiter;
loki::Security security(lokid_key_pair, options.data_dir);
@ -203,9 +228,8 @@ int main(int argc, char* argv[]) {
systemd_watchdog_tick(systemd_watchdog_timer, service_node);
#endif
/// Should run http server
loki::http_server::run(ioc, options.ip, options.port, options.data_dir,
service_node, channel_encryption, rate_limiter,
service_node, request_handler, rate_limiter,
security);
} catch (const std::exception& e) {
// It seems possible for logging to throw its own exception,

View file

@ -0,0 +1,516 @@
#include "loki_logger.h"
#include "request_handler.h"
#include "service_node.h"
#include "utils.hpp"
#include "channel_encryption.hpp"
using nlohmann::json;
namespace loki {
constexpr size_t MAX_MESSAGE_BODY = 102400; // 100 KB limit
std::string to_string(const Response& res) {
std::stringstream ss;
ss << "Status: " << static_cast<int>(res.status()) << ", ";
ss << "ContentType: " << ((res.content_type() == ContentType::plaintext) ? "plaintext" : "json") << ", ";
ss << "Body: <" << res.message() << ">";
return ss.str();
}
RequestHandler::RequestHandler(ServiceNode& sn,
const ChannelEncryption<std::string>& ce)
: service_node_(sn), channel_cipher_(ce) {}
static json snodes_to_json(const std::vector<sn_record_t>& snodes) {
json res_body;
json snodes_json = json::array();
for (const auto& sn : snodes) {
json snode;
snode["address"] = sn.sn_address();
snode["pubkey_x25519"] = sn.pubkey_x25519_hex();
snode["pubkey_ed25519"] = sn.pubkey_ed25519_hex();
snode["port"] = std::to_string(sn.port());
snode["ip"] = sn.ip();
snodes_json.push_back(snode);
}
res_body["snodes"] = snodes_json;
return res_body;
}
static std::string obfuscate_pubkey(const std::string& pk) {
std::string res = pk.substr(0, 2);
res += "...";
res += pk.substr(pk.length() - 3, pk.length() - 1);
return res;
}
/// TODO: this probably shouldn't return Response...
Response RequestHandler::handle_wrong_swarm(const user_pubkey_t& pubKey) {
const std::vector<sn_record_t> nodes =
service_node_.get_snodes_by_pk(pubKey);
const json res_body = snodes_to_json(nodes);
LOKI_LOG(trace, "Got client request to a wrong swarm");
return Response{Status::MISDIRECTED_REQUEST, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_store(const json& params) {
constexpr const char* fields[] = {"pubKey", "ttl", "nonce", "timestamp",
"data"};
for (const auto& field : fields) {
if (!params.contains(field)) {
LOKI_LOG(debug, "Bad client request: no `{}` field", field);
return Response{Status::BAD_REQUEST, fmt::format("invalid json: no `{}` field\n", field)};
}
}
const auto ttl = params.at("ttl").get_ref<const std::string&>();
const auto nonce = params.at("nonce").get_ref<const std::string&>();
const auto timestamp = params.at("timestamp").get_ref<const std::string&>();
const auto data = params.at("data").get_ref<const std::string&>();
LOKI_LOG(trace, "Storing message: {}", data);
bool created;
auto pk =
user_pubkey_t::create(params.at("pubKey").get<std::string>(), created);
if (!created) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
LOKI_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
if (data.size() > MAX_MESSAGE_BODY) {
LOKI_LOG(debug, "Message body too long: {}", data.size());
auto msg = fmt::format("Message body exceeds maximum allowed length of {}\n",
MAX_MESSAGE_BODY);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
if (!service_node_.is_pubkey_for_us(pk)) {
return this->handle_wrong_swarm(pk);
}
uint64_t ttlInt;
if (!util::parseTTL(ttl, ttlInt)) {
LOKI_LOG(debug, "Forbidden. Invalid TTL: {}", ttl);
return Response{Status::FORBIDDEN, "Provided TTL is not valid.\n"};
}
uint64_t timestampInt;
if (!util::parseTimestamp(timestamp, ttlInt, timestampInt)) {
LOKI_LOG(debug, "Forbidden. Invalid Timestamp: {}", timestamp);
return Response{Status::NOT_ACCEPTABLE, "Timestamp error: check your clock\n"};
}
// Do not store message if the PoW provided is invalid
std::string messageHash;
const bool valid_pow =
checkPoW(nonce, timestamp, ttl, pk.str(), data, messageHash,
service_node_.get_curr_pow_difficulty());
#ifndef DISABLE_POW
if (!valid_pow) {
LOKI_LOG(debug, "Forbidden. Invalid PoW nonce: {}", nonce);
json res_body;
res_body["difficulty"] = service_node_.get_curr_pow_difficulty();
return Response{Status::INVALID_POW, res_body.dump(), ContentType::json};
}
#endif
bool success;
try {
const auto msg =
message_t{pk.str(), data, messageHash, ttlInt, timestampInt, nonce};
success = service_node_.process_store(msg);
} catch (std::exception e) {
LOKI_LOG(critical,
"Internal Server Error. Could not store message for {}",
obfuscate_pubkey(pk.str()));
return Response{Status::INTERNAL_SERVER_ERROR, e.what()};
}
if (!success) {
LOKI_LOG(warn, "Service node is initializing");
return Response{Status::SERVICE_UNAVAILABLE, "Service node is initializing\n"};
}
LOKI_LOG(trace, "Successfully stored message for {}",
obfuscate_pubkey(pk.str()));
json res_body;
res_body["difficulty"] = service_node_.get_curr_pow_difficulty();
return Response{Status::OK, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_retrieve_all() {
std::vector<storage::Item> all_entries;
bool res = service_node_.get_all_messages(all_entries);
if (!res) {
return Response{Status::INTERNAL_SERVER_ERROR, "could not retrieve all entries\n"};
}
json messages = json::array();
for (auto& entry : all_entries) {
json item;
item["data"] = entry.data;
item["pk"] = entry.pub_key;
messages.push_back(item);
}
json res_body;
res_body["messages"] = messages;
return Response{Status::OK, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_snodes_by_pk(const json& params) const {
if (!params.contains("pubKey")) {
LOKI_LOG(debug, "Bad client request: no `pubKey` field");
return Response{Status::BAD_REQUEST, "invalid json: no `pubKey` field\n"};
}
bool success;
const auto pk =
user_pubkey_t::create(params.at("pubKey").get<std::string>(), success);
if (!success) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
LOKI_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
const std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pk);
LOKI_LOG(debug, "Snodes by pk size: {}", nodes.size());
const json res_body = snodes_to_json(nodes);
LOKI_LOG(debug, "Snodes by pk: {}", res_body.dump());
return Response{Status::OK, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_retrieve(const json& params) {
service_node_.all_stats_.bump_retrieve_requests();
constexpr const char* fields[] = {"pubKey", "lastHash"};
for (const auto& field : fields) {
if (!params.contains(field)) {
auto msg = fmt::format("invalid json: no `{}` field", field);
LOKI_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
}
bool success;
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
if (!success) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
LOKI_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
}
if (!service_node_.is_pubkey_for_us(pk)) {
return this->handle_wrong_swarm(pk);
}
const std::string& last_hash = params.at("lastHash").get_ref<const std::string&>();
// Note: We removed long-polling
std::vector<storage::Item> items;
if (!service_node_.retrieve(pk.str(), last_hash, items)) {
auto msg = fmt::format(
"Internal Server Error. Could not retrieve messages for {}",
obfuscate_pubkey(pk.str()));
LOKI_LOG(critical, "{}", msg);
return Response{Status::INTERNAL_SERVER_ERROR, std::move(msg)};
}
if (!items.empty()) {
LOKI_LOG(trace, "Successfully retrieved messages for {}",
obfuscate_pubkey(pk.str()));
}
json res_body;
json messages = json::array();
for (const auto& item : items) {
json message;
message["hash"] = item.hash;
/// TODO: calculate expiration time once only?
message["expiration"] = item.timestamp + item.ttl;
message["data"] = item.data;
messages.push_back(message);
}
res_body["messages"] = messages;
return Response{Status::OK, res_body.dump(), ContentType::json};
}
Response RequestHandler::process_client_req(const std::string& req_json) {
const json body = json::parse(req_json, nullptr, false);
if (body == nlohmann::detail::value_t::discarded) {
LOKI_LOG(debug, "Bad client request: invalid json");
return Response{Status::BAD_REQUEST, "invalid json\n"};
}
const auto method_it = body.find("method");
if (method_it == body.end() || !method_it->is_string()) {
LOKI_LOG(debug, "Bad client request: no method field");
return Response{Status::BAD_REQUEST, "invalid json: no `method` field\n"};
}
const auto& method_name = method_it->get_ref<const std::string&>();
const auto params_it = body.find("params");
if (params_it == body.end() || !params_it->is_object()) {
LOKI_LOG(debug, "Bad client request: no params field");
return Response{Status::BAD_REQUEST, "invalid json: no `params` field\n"};
}
if (method_name == "store") {
LOKI_LOG(debug, "Process client request: store");
return this->process_store(*params_it);
} else if (method_name == "retrieve") {
LOKI_LOG(debug, "Process client request: retrieve");
return this->process_retrieve(*params_it);
// TODO: maybe we should check if (some old) clients requests long-polling and
// then wait before responding to prevent spam
} else if (method_name == "get_snodes_for_pubkey") {
LOKI_LOG(debug, "Process client request: snodes for pubkey");
return this->process_snodes_by_pk(*params_it);
} else {
LOKI_LOG(debug, "Bad client request: unknown method '{}'", method_name);
return Response{Status::BAD_REQUEST, fmt::format("no method {}", method_name)};
}
}
Response
RequestHandler::wrap_proxy_response(const Response& res,
const std::string& client_key) const {
nlohmann::json json_res;
json_res["status"] = res.status();
json_res["body"] = res.message();
const std::string res_body = json_res.dump();
/// change to encrypt_gcm
std::string ciphertext = util::base64_encode(channel_cipher_.encrypt_gcm(res_body, client_key));
// why does this have to be json???
return Response{Status::OK, std::move(ciphertext), ContentType::json};
}
Response RequestHandler::process_onion_exit(const std::string& eph_key,
const std::string& payload) {
LOKI_LOG(debug, "Processing onion exit!");
std::string body;
try {
const json req = json::parse(payload, nullptr, true);
body = req.at("body").get<std::string>();
// TODO: check if the client requested long-polling and see if we want
// to do anything about it.
LOKI_LOG(debug, "CLIENT HEADERS: \n\t{}", req.at("headers").dump(2));
} catch (std::exception& e) {
auto msg = fmt::format("JSON parsing error: {}", e.what());
LOKI_LOG(error, "{}", msg);
return {Status::BAD_REQUEST, msg};
}
const auto res = this->process_client_req(body);
LOKI_LOG(debug, "about to respond with: {}", to_string(res));
return wrap_proxy_response(res, eph_key);
}
Response RequestHandler::process_proxy_exit(const std::string& client_key,
const std::string& payload) {
LOKI_LOG(debug, "Process proxy exit");
const auto plaintext = channel_cipher_.decrypt_cbc(payload, client_key);
std::string body;
try {
const json req = json::parse(plaintext, nullptr, true);
body = req.at("body").get<std::string>();
// TOOD: check if the client requested long-polling and see if we want
// to do anything about it.
LOKI_LOG(debug, "CLIENT HEADERS: \n\t{}", req.at("headers").dump(2));
} catch (std::exception& e) {
auto msg = fmt::format("JSON parsing error: {}", e.what());
LOKI_LOG(error, "{}", msg);
return {Status::BAD_REQUEST, msg};
}
const auto res = this->process_client_req(body);
LOKI_LOG(debug, "about to respond with: {}", to_string(res));
return wrap_proxy_response(res, client_key);
}
void RequestHandler::process_onion_req(const std::string& ciphertext,
const std::string& ephem_key,
std::function<void(loki::Response)> cb) {
std::string plaintext;
static int counter = 0;
try {
const std::string ciphertext_bin = util::base64_decode(ciphertext);
plaintext = channel_cipher_.decrypt_gcm(ciphertext_bin, ephem_key);
} catch (const std::exception& e) {
LOKI_LOG(debug, "Error decrypting an onion request: {}", e.what());
// Should this error be propagated back to the client?
cb(loki::Response{Status::BAD_REQUEST, "Invalid ciphertext"});
return;
}
LOKI_LOG(debug, "onion request decrypted: <{}>", plaintext);
try {
const json inner_json = json::parse(plaintext, nullptr, true);
if (inner_json.find("body") != inner_json.end()) {
LOKI_LOG(debug, "We are the final destination in the onion request!");
loki::Response res = this->process_onion_exit(ephem_key, plaintext);
cb(std::move(res));
return;
} else if (inner_json.find("url") != inner_json.end()) {
const auto& url = inner_json.at("url").get_ref<const std::string&>();
LOKI_LOG(debug, "We are to forward the request to url: {}", url);
// This will be an async request, so need to pass a callback (and make sure we don't respond until then)
// TODO2: make open groups work!
abort();
// cb()
return;
}
const auto& payload = inner_json.at("ciphertext").get_ref<const std::string&>();
const auto& dest = inner_json.at("destination").get_ref<const std::string&>();
const auto& ekey = inner_json.at("ephemeral_key").get_ref<const std::string&>();
auto sn = service_node_.find_node_by_ed25519_pk(dest);
if (!sn) {
auto msg = fmt::format("Next node not found: {}", dest);
LOKI_LOG(warn, "{}", msg);
auto res = loki::Response{Status::BAD_REQUEST, std::move(msg)};
cb(res);
return;
}
nlohmann::json req_body;
req_body["ciphertext"] = payload;
req_body["ephemeral_key"] = ekey;
auto on_response = [cb, counter_copy = counter](bool success, std::vector<std::string> data) {
LOKI_LOG(debug, "on onion response, {}", counter_copy);
LOKI_LOG(debug, " success: {}", success);
LOKI_LOG(debug, " data.size: {}", data.size());
for (const std::string& part : data) {
LOKI_LOG(debug, " part: {}", part);
}
if (!success) {
LOKI_LOG(debug, "[Onion request] Request time out");
cb(loki::Response{Status::BAD_REQUEST, "Request time out"});
return;
}
// We only expect a two-part message
if (data.size() != 2) {
LOKI_LOG(debug, "[Onion request] Incorrect number of messages: {}", data.size());
cb(loki::Response{Status::BAD_REQUEST, "Incorrect number of messages"});
return;
}
/// We use http status codes (for now)
if (data[0] == "200") {
cb(loki::Response{Status::OK, std::move(data[1])});
} else {
LOKI_LOG(debug, "Onion request relay failed with: {}", data[1]);
cb(loki::Response{Status::SERVICE_UNAVAILABLE, ""});
}
};
LOKI_LOG(debug, "send_onion_to_sn, sn: {} reqidx: {}", *sn, counter++);
// Note: we shouldn't use http here
service_node_.send_onion_to_sn(*sn, payload, ekey, on_response);
} catch (std::exception& e) {
LOKI_LOG(debug, "Error parsing inner JSON in onion request: {}", e.what());
cb(loki::Response{Status::BAD_REQUEST, "Invalid json"});
}
}
} // namespace loki

View file

@ -0,0 +1,118 @@
#pragma once
#include <string>
#include <string>
#include "loki_common.h"
// TODO: can I avoid including this in the header?
#include "../external/json.hpp"
// TODO: move ChannelEncryption to ::loki
template <typename T>
class ChannelEncryption;
namespace loki {
class ServiceNode;
enum class Status {
OK = 200,
BAD_REQUEST = 400,
FORBIDDEN = 403,
NOT_ACCEPTABLE = 406,
MISDIRECTED_REQUEST = 421,
INVALID_POW = 432, // unassigned http code
SERVICE_UNAVAILABLE = 503,
INTERNAL_SERVER_ERROR = 500,
};
enum class ContentType {
plaintext,
json,
};
namespace ss_client {
enum class ReqMethod {
DATA, // Database entries
PROXY_EXIT, // A session client request coming through a proxy
ONION_REQUEST,
};
class Request {
public:
std::string body;
// Might change this to a vector later
std::map<std::string, std::string> headers;
};
};
class Response {
Status status_;
std::string message_;
ContentType content_type_;
public:
Response(Status s, std::string m, ContentType ct = ContentType::plaintext)
: status_(s), message_(std::move(m)), content_type_(ct) {}
const std::string& message() const { return message_; }
Status status() const { return status_; }
ContentType content_type() const { return content_type_; }
};
std::string to_string(const Response& res);
class RequestHandler {
ServiceNode& service_node_;
const ChannelEncryption<std::string>& channel_cipher_;
// Wrap response `res` to an intermediate node
Response wrap_proxy_response(const Response& res,
const std::string& client_key) const;
// Return the correct swarm for `pubKey`
Response handle_wrong_swarm(const user_pubkey_t& pubKey);
// ===== Session Client Requests =====
// Similar to `handle_wrong_swarm`; but used when the swarm is requested explicitly
Response process_snodes_by_pk(const nlohmann::json& params) const;
// Save the message and relay the swarm
Response process_store(const nlohmann::json& params);
// Query the database and return requested messages
Response process_retrieve(const nlohmann::json& params);
Response process_onion_exit(const std::string& eph_key,
const std::string& payload);
// ===================================
public:
RequestHandler(ServiceNode& sn, const ChannelEncryption<std::string>& ce);
// Process all Session client requests
Response process_client_req(const std::string& req_json);
// Test only: retrieve all db entires
Response process_retrieve_all();
// Handle a Session client reqeust sent via SN proxy
Response process_proxy_exit(const std::string& client_key,
const std::string& payload);
// The result will arrive asynchronously, so it needs a callback handler
void process_onion_req(const std::string& ciphertext,
const std::string& ephem_key,
std::function<void(loki::Response)> cb);
};
}

View file

@ -12,6 +12,10 @@
#include "signature.h"
#include "utils.hpp"
#include "version.h"
#include "lokimq.h"
#include "lmq_server.h"
#include "request_handler.h"
#include "dns_text_records.h"
@ -40,8 +44,8 @@ static void make_sn_request(boost::asio::io_context& ioc, const sn_record_t& sn,
const std::shared_ptr<request_t>& req,
http_callback_t&& cb) {
// TODO: Return to using snode address instead of ip
return make_https_request(ioc, sn.ip(), sn.port(), sn.pub_key_base32z(),
req, std::move(cb));
make_https_request(ioc, sn.ip(), sn.port(), sn.pub_key_base32z(), req,
std::move(cb));
}
FailedRequestHandler::FailedRequestHandler(
@ -145,8 +149,8 @@ static bool verify_message(const message_t& msg,
ServiceNode::ServiceNode(boost::asio::io_context& ioc,
boost::asio::io_context& worker_ioc, uint16_t port,
LokimqServer& lmq_server,
const lokid_key_pair_t& lokid_key_pair,
const loki::lokid_key_pair_t& key_pair_x25519,
const std::string& db_location,
LokidClient& lokid_client, const bool force_start)
: ioc_(ioc), worker_ioc_(worker_ioc),
@ -154,9 +158,8 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc,
swarm_update_timer_(ioc), lokid_ping_timer_(ioc),
stats_cleanup_timer_(ioc), pow_update_timer_(worker_ioc),
check_version_timer_(worker_ioc), peer_ping_timer_(ioc),
relay_timer_(ioc), lokid_key_pair_(lokid_key_pair),
lokid_key_pair_x25519_(key_pair_x25519), lokid_client_(lokid_client),
force_start_(force_start) {
relay_timer_(ioc), lokid_key_pair_(lokid_key_pair), lmq_server_(lmq_server),
lokid_client_(lokid_client), force_start_(force_start) {
char buf[64] = {0};
if (!util::base32z_encode(lokid_key_pair_.public_key, buf)) {
@ -169,7 +172,8 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc,
const auto pk_hex = util::as_hex(lokid_key_pair_.public_key);
// TODO: get rid of "unused" fields
our_address_ = sn_record_t(port, addr, pk_hex, "unused", "unused", "1.1.1.1");
our_address_ = sn_record_t(port, addr, pk_hex, "unused", "unused", "unused",
"1.1.1.1");
// TODO: fail hard if we can't encode our public key
LOKI_LOG(info, "Read our snode address: {}", our_address_);
@ -188,7 +192,9 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc,
lokid_ping_timer_tick();
cleanup_timer_tick();
#ifndef INTEGRATION_TEST
ping_peers_tick();
#endif
worker_thread_ = boost::thread([this]() { worker_ioc_.run(); });
boost::asio::post(worker_ioc_, [this]() {
@ -200,6 +206,7 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc,
[this]() { this->check_version_timer_tick(); });
}
static block_update_t
parse_swarm_update(const std::shared_ptr<std::string>& response_body) {
@ -216,6 +223,8 @@ parse_swarm_update(const std::shared_ptr<std::string>& response_body) {
std::map<swarm_id_t, std::vector<sn_record_t>> swarm_map;
block_update_t bu;
LOKI_LOG(debug, "swarm repsonse: {}", *response_body);
try {
const auto& result = body.at("result");
bu.height = result.at("height").get<uint64_t>();
@ -239,15 +248,24 @@ parse_swarm_update(const std::shared_ptr<std::string>& response_body) {
const auto& snode_ip =
sn_json.at("public_ip").get_ref<const std::string&>();
const auto& pubkey_x25519 =
const auto& pubkey_x25519_hex =
sn_json.at("pubkey_x25519").get_ref<const std::string&>();
// lokidKeyFromHex works for pub keys too
const public_key_t pubkey_x25519 = lokidKeyFromHex(pubkey_x25519_hex);
const std::string pubkey_x25519_bin = key_to_string(pubkey_x25519);
const auto& pubkey_ed25519 =
sn_json.at("pubkey_ed25519").get_ref<const std::string&>();
const auto sn =
sn_record_t{port, std::move(snode_address), pubkey,
pubkey_x25519, pubkey_ed25519, snode_ip};
const auto sn = sn_record_t{port,
std::move(snode_address),
pubkey,
pubkey_x25519_hex,
pubkey_x25519_bin,
pubkey_ed25519,
snode_ip};
const bool fully_funded = sn_json.at("funded").get<bool>();
@ -378,40 +396,166 @@ ServiceNode::~ServiceNode() {
worker_thread_.join();
};
void ServiceNode::relay_data_reliable(const std::shared_ptr<request_t>& req,
static const char* method_to_str(ss_client::ReqMethod method, bool lmq) {
if (lmq) {
switch (method) {
case ss_client::ReqMethod::DATA:
return "sn.data";
case ss_client::ReqMethod::PROXY_EXIT:
return "sn.proxy_exit";
case ss_client::ReqMethod::ONION_REQUEST:
return "sn.onion_req";
default:
LOKI_LOG(critical, "UNKNOWN SS CLIENT METHOD");
return "<invalid>";
}
} else {
switch (method) {
case ss_client::ReqMethod::DATA:
return "/swarms/push_batch/v1";
case ss_client::ReqMethod::PROXY_EXIT:
return "/swarms/proxy_exit";
case ss_client::ReqMethod::ONION_REQUEST:
return "/onion_req";
default:
LOKI_LOG(critical, "UNKNOWN SS CLIENT METHOD");
return "<invalid>";
}
}
}
void ServiceNode::send_onion_to_sn(const sn_record_t& sn, const std::string& payload,
const std::string& eph_key,
ss_client::Callback cb) const {
lmq_server_.lmq()->request(sn.pubkey_x25519_bin(), "sn.onion_req",
std::move(cb), eph_key, payload);
}
// Calls callback on success only?
void ServiceNode::send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
ss_client::Request req,
ss_client::Callback cb) const {
const bool use_lmq = this->hardfork_ >= LOKIMQ_ONION_HARDFORK;
const char* method_str = method_to_str(method, use_lmq);
if (use_lmq) {
// ===== make an lmq request =====
LOKI_LOG(debug, "Going to use lokimq to send {} request to {}", method_str, util::as_hex(sn.pubkey_x25519_bin()));
// NOTE: this is ugly, but we will remove HTTP option in the next release,
// and send_to_sn will probably go away with it.
switch (method) {
case ss_client::ReqMethod::DATA: {
lmq_server_.lmq()->request(sn.pubkey_x25519_bin(), method_str, std::move(cb), req.body);
break;
}
case ss_client::ReqMethod::PROXY_EXIT: {
auto client_key = req.headers.find(LOKI_SENDER_KEY_HEADER);
// I could just always assume that we are passing the right parameters...
if (client_key != req.headers.end()) {
lmq_server_.lmq()->request(sn.pubkey_x25519_bin(), method_str, std::move(cb), client_key->second, req.body);
} else {
LOKI_LOG(debug, "Developer error: no {} passed in headers", LOKI_SENDER_KEY_HEADER);
// TODO: call cb?
assert(false);
}
break;
}
}
} else {
// ===== make an https request =====
LOKI_LOG(debug, "Going to use HTTP to send a request");
auto http_req = build_post_request(method_str, std::move(req.body));
// NOTE: we are not signing headers (why not?)
this->sign_request(http_req);
for (auto& header : req.headers) {
LOKI_LOG(debug, " - {}:{}", header.first, header.second);
http_req->insert(header.first, header.second);
}
const bool needs_retrying =
static_cast<bool>(method == ss_client::ReqMethod::DATA);
// Note: often one of the reason for failure here is that the node has just
// deregistered but our SN hasn't updated its swarm list yet.
make_sn_request(ioc_, sn, http_req, [this, sn, http_req, cb, needs_retrying](sn_response_t&& res) {
if (res.error_code != SNodeError::NO_ERROR) {
all_stats_.record_request_failed(sn);
if (res.error_code == SNodeError::NO_REACH) {
LOKI_LOG(debug,
"Could not send to {} at first attempt: "
"(Unreachable)",
sn);
} else if (res.error_code == SNodeError::ERROR_OTHER) {
LOKI_LOG(debug,
"Could send to {} at first attempt: "
"(Generic error)",
sn);
}
if (!needs_retrying) {
cb(false, {});
return;
}
std::function<void()> give_up_cb = [this, sn, cb]() {
LOKI_LOG(debug, "Failed to send a request to: {}", sn);
this->all_stats_.record_push_failed(sn);
cb(false, {});
};
boost::optional<std::function<void()>> gu_cb = give_up_cb;
// TODO: only retry if we are sending messages
// Not sure if we should use this for all http requests (or at all)
std::make_shared<FailedRequestHandler>(ioc_, sn, http_req, std::move(gu_cb))
->init_timer();
} else {
LOKI_LOG(debug, "SN HTTP request is OK");
if (res.body) {
cb(true, {*res.body});
} else {
cb(true, {});
}
}
});
}
}
void ServiceNode::relay_data_reliable(const std::string& blob,
const sn_record_t& sn) const {
LOKI_LOG(trace, "Relaying data to: {}", sn);
auto reply_callback = [](bool success, std::vector<std::string> data) {
// Note: often one of the reason for failure here is that the node has just
// deregistered but our SN hasn't updated its swarm list yet.
make_sn_request(ioc_, sn, req, [this, sn, req](sn_response_t&& res) {
if (res.error_code != SNodeError::NO_ERROR) {
all_stats_.record_request_failed(sn);
if (res.error_code == SNodeError::NO_REACH) {
LOKI_LOG(debug,
"Could not relay data to {} at first attempt: "
"(Unreachable)",
sn);
} else if (res.error_code == SNodeError::ERROR_OTHER) {
LOKI_LOG(debug,
"Could not relay data to {} at first attempt: "
"(Generic error)",
sn);
}
std::function<void()> give_up_cb = [this, sn]() {
LOKI_LOG(debug, "Failed to send a request to: {}", sn);
this->all_stats_.record_push_failed(sn);
};
boost::optional<std::function<void()>> cb = give_up_cb;
std::make_shared<FailedRequestHandler>(ioc_, sn, req, std::move(cb))
->init_timer();
if (!success) {
LOKI_LOG(error, "Failed to send batch data: time-out");
}
});
};
LOKI_LOG(debug, "Relaying data to: {}", sn);
auto req = ss_client::Request{blob, {}};
this->send_to_sn(sn, ss_client::ReqMethod::DATA, std::move(req), reply_callback);
}
void ServiceNode::register_listener(const std::string& pk,
@ -524,32 +668,6 @@ void ServiceNode::process_push(const message_t& msg) {
save_if_new(msg);
}
void ServiceNode::process_proxy_req(const std::string& req_body,
const std::string& sender_key,
const std::string& target_snode,
http_callback_t&& on_proxy_response) {
auto sn = swarm_->find_node_by_ed25519_pk(target_snode);
if (!sn) {
LOKI_LOG(debug, "Could not find target snode for proxy: {}", target_snode);
on_proxy_response(sn_response_t{SNodeError::ERROR_OTHER, nullptr, boost::none});
return;
}
LOKI_LOG(trace, "Target Snode: {}", target_snode);
auto body_clone = req_body;
auto req = build_post_request("/swarms/proxy_exit", std::move(body_clone));
req->insert(LOKI_SENDER_KEY_HEADER, sender_key);
this->sign_request(req);
make_sn_request(ioc_, *sn, req, std::move(on_proxy_response));
}
void ServiceNode::save_if_new(const message_t& msg) {
if (db_->store(msg.hash, msg.pub_key, msg.data, msg.ttl, msg.timestamp,
@ -582,7 +700,10 @@ void ServiceNode::on_bootstrap_update(const block_update_t& bu) {
void ServiceNode::on_swarm_update(const block_update_t& bu) {
hardfork_ = bu.hardfork;
if (this->hardfork_ != bu.hardfork) {
LOKI_LOG(debug, "New hardfork: {}", bu.hardfork);
hardfork_ = bu.hardfork;
}
if (syncing_) {
if (target_height_ == 0) {
@ -667,7 +788,10 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) {
salvage_data();
}
#ifndef INTEGRATION_TEST
initiate_peer_test();
#endif
}
void ServiceNode::relay_buffered_messages() {
@ -1199,7 +1323,7 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester,
block_hash = block_hash_;
} else if (blk_height < block_height_) {
LOKI_LOG(debug, "got storage test request for an older block: {}/{}",
LOKI_LOG(trace, "got storage test request for an older block: {}/{}",
blk_height, block_height_);
const auto it =
@ -1211,7 +1335,7 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester,
if (it != block_hashes_cache_.end()) {
block_hash = it->second;
} else {
LOKI_LOG(debug, "Could not find hash for a given block height");
LOKI_LOG(trace, "Could not find hash for a given block height");
// TODO: request from lokid?
return false;
}
@ -1498,35 +1622,21 @@ void ServiceNode::bootstrap_swarms(
template <typename Message>
void ServiceNode::relay_messages(const std::vector<Message>& messages,
const std::vector<sn_record_t>& snodes) const {
std::vector<std::string> data = serialize_messages(messages);
std::vector<std::string> batches = serialize_messages(messages);
LOKI_LOG(info, "Relayed messages:");
for (auto msg : messages) {
LOKI_LOG(info, " {}", msg.data);
LOKI_LOG(debug, "Relayed messages:");
for (auto msg : batches) {
LOKI_LOG(debug, " {}", msg);
}
LOKI_LOG(info, "To Snodes:");
LOKI_LOG(debug, "To Snodes:");
for (auto sn : snodes) {
LOKI_LOG(info, " {}", sn);
LOKI_LOG(debug, " {}", sn);
}
std::vector<signature> signatures;
signatures.reserve(data.size());
for (const auto& d : data) {
const auto hash = hash_data(d);
signatures.push_back(generate_signature(hash, lokid_key_pair_));
}
std::vector<std::shared_ptr<request_t>> batches =
make_batch_requests(std::move(data));
assert(batches.size() == signatures.size());
for (size_t i = 0; i < batches.size(); ++i) {
attach_signature(batches[i], signatures[i]);
}
LOKI_LOG(debug, "Serialised batches: {}", data.size());
LOKI_LOG(debug, "Serialised batches: {}", batches.size());
for (const sn_record_t& sn : snodes) {
for (const std::shared_ptr<request_t>& batch : batches) {
for (auto& batch : batches) {
// TODO: I could probably avoid copying here
relay_data_reliable(batch, sn);
}
}
@ -1695,7 +1805,7 @@ void ServiceNode::process_push_batch(const std::string& blob) {
m.data};
});
save_bulk(items);
this->save_bulk(items);
LOKI_LOG(trace, "Saving all: end");
}
@ -1744,4 +1854,24 @@ bool ServiceNode::is_snode_address_known(const std::string& sn_address) {
return swarm_->is_fully_funded_node(sn_address);
}
boost::optional<sn_record_t>
ServiceNode::find_node_by_x25519_bin(const sn_pub_key_t& pk) const {
if (swarm_) {
return swarm_->find_node_by_x25519_bin(pk);
}
return boost::none;
}
boost::optional<sn_record_t>
ServiceNode::find_node_by_ed25519_pk(const std::string& pk) const {
if (swarm_) {
return swarm_->find_node_by_ed25519_pk(pk);
}
return boost::none;
}
} // namespace loki

View file

@ -23,6 +23,7 @@
static constexpr size_t BLOCK_HASH_CACHE_SIZE = 30;
static constexpr int STORAGE_SERVER_HARDFORK = 12;
static constexpr int ENFORCED_REACHABILITY_HARDFORK = 13;
static constexpr int LOKIMQ_ONION_HARDFORK = 15;
class Database;
@ -40,6 +41,14 @@ struct blockchain_test_answer_t;
struct bc_test_params_t;
class LokidClient;
class LokimqServer;
namespace ss_client {
class Request;
enum class ReqMethod;
using Callback = std::function<void(bool success, std::vector<std::string>)>;
} // namespace ss_client
namespace http_server {
class connection_t;
@ -135,7 +144,8 @@ class ServiceNode {
std::unordered_map<pub_key_t, listeners_t> pk_to_listeners;
loki::lokid_key_pair_t lokid_key_pair_;
loki::lokid_key_pair_t lokid_key_pair_x25519_;
LokimqServer& lmq_server_;
reachability_records_t reach_records_;
@ -165,15 +175,13 @@ class ServiceNode {
/// (called when our old node got dissolved)
void salvage_data() const;
void sign_request(std::shared_ptr<request_t> &req) const;
void attach_signature(std::shared_ptr<request_t>& request,
const signature& sig) const;
void attach_pubkey(std::shared_ptr<request_t>& request) const;
/// Reliably push message/batch to a service node
void relay_data_reliable(const std::shared_ptr<request_t>& req,
void relay_data_reliable(const std::string& blob,
const sn_record_t& address) const;
template <typename Message>
@ -238,11 +246,13 @@ class ServiceNode {
// Ping some node and record its reachability
void test_reachability(const sn_record_t& sn);
void initLokiMQ(const loki::lokid_key_pair_t& keypair, uint16_t port);
public:
ServiceNode(boost::asio::io_context& ioc,
boost::asio::io_context& worker_ioc, uint16_t port,
LokimqServer& lmq_server,
const loki::lokid_key_pair_t& key_pair,
const loki::lokid_key_pair_t& key_pair_x25519,
const std::string& db_location, LokidClient& lokid_client,
const bool force_start);
@ -250,6 +260,18 @@ class ServiceNode {
mutable all_stats_t all_stats_;
// This is new, so it does not need to support http, thus new (if temp) method
void send_onion_to_sn(const sn_record_t& sn, const std::string& payload,
const std::string& eph_key, ss_client::Callback cb) const;
// TODO: move this eventually out of SN
// Send by either http or lmq
void send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
ss_client::Request req,
ss_client::Callback cb) const;
void sign_request(std::shared_ptr<request_t> &req) const;
// Return true if the service node is ready to start running
bool snode_ready(boost::optional<std::string&> reason);
@ -270,11 +292,6 @@ class ServiceNode {
/// Process message received from a client, return false if not in a swarm
bool process_store(const message_t& msg);
void
process_proxy_req(const std::string& req, const std::string& sender_key,
const std::string& target_snode,
std::function<void(sn_response_t)>&& on_proxy_response);
/// Process message relayed from another SN from our swarm
void process_push(const message_t& msg);
@ -313,6 +330,13 @@ class ServiceNode {
std::string get_stats() const;
std::string get_status_line() const;
boost::optional<sn_record_t>
find_node_by_x25519_bin(const sn_pub_key_t& address) const;
boost::optional<sn_record_t>
find_node_by_ed25519_pk(const std::string& pk) const;
};
} // namespace loki

View file

@ -252,6 +252,18 @@ Swarm::find_node_by_ed25519_pk(const std::string& pk) const {
return boost::none;
}
boost::optional<sn_record_t>
Swarm::find_node_by_x25519_bin(const std::string& pk) const {
for (const auto& sn : all_funded_nodes_) {
if (sn.pubkey_x25519_bin() == pk) {
return sn;
}
}
return boost::none;
}
boost::optional<sn_record_t>
Swarm::get_node_by_pk(const sn_pub_key_t& pk) const {

View file

@ -111,6 +111,9 @@ class Swarm {
boost::optional<sn_record_t>
find_node_by_ed25519_pk(const sn_pub_key_t& address) const;
boost::optional<sn_record_t>
find_node_by_x25519_bin(const sn_pub_key_t& address) const;
};
} // namespace loki

1
vendors/loki-mq vendored Submodule

@ -0,0 +1 @@
Subproject commit 512086613a79a014d5b18da28dff3828bf76a1aa