Replace boost::beast with uWebSockets

uWebSockets is very fast, has a much nicer interface than boost::beast,
and is currently used for handling all http rpc requests in oxen-core.

As part of this:

- Remove net stats singleton; it was very granular -- just number of
  http connections established with no indication of what type of
  connections (e.g. snode-to-snode or client) those are.  Will think
  about reintroducing some less granular stats gathering in the future
  if we need them.
- Update oxenmq to dev branch (for new cancellable timer support)
- move boost asio io_context into ServiceNode rather than main (and
  expose it publicly)
- add signal handler and shutdown logic
- remove long-deprecated (and non-functional) detection of long-poll
  headers
- import string_utils.h from oxen-core (for durations, int parsing,
  etc.); we already had *some* of this code with copies of
  starts_with/ends_with/join.
- import file.hpp from oxen-core (for file slurping)
- move more request logic into request_handler
- move common http types/values into `oxen::http` namespace (e.g. status
  codes, header names)
This commit is contained in:
Jason Rhinelander 2021-05-25 17:29:49 -03:00
parent 2776f434e8
commit 1918c7f111
22 changed files with 446 additions and 1503 deletions

3
.gitmodules vendored
View file

@ -7,3 +7,6 @@
[submodule "vendors/nlohmann_json"]
path = vendors/nlohmann_json
url = https://github.com/nlohmann/json.git
[submodule "vendors/uWebSockets"]
path = vendors/uWebSockets
url = https://github.com/uNetworking/uWebSockets.git

View file

@ -12,7 +12,6 @@ add_library(httpserver_lib STATIC
rate_limiter.cpp
https_client.cpp
stats.cpp
security.cpp
command_line.cpp
reachability_testing.cpp
lmq_server.cpp
@ -21,6 +20,7 @@ add_library(httpserver_lib STATIC
ip_utils.cpp
oxend_rpc.cpp
server_certificates.cpp
https_server.cpp
)
# TODO: enable more warnings!
@ -28,6 +28,7 @@ target_compile_options(httpserver_lib PRIVATE -Wall -Werror)
target_link_libraries(httpserver_lib PUBLIC
common storage utils crypto
uWebSockets
OpenSSL::SSL OpenSSL::Crypto
nlohmann_json::nlohmann_json
oxenmq::oxenmq

View file

@ -1,9 +1,7 @@
#include "http_connection.h"
#include "Database.hpp"
#include "net_stats.h"
#include "rate_limiter.h"
#include "security.h"
#include "serialization.h"
#include "server_certificates.h"
#include "service_node.h"
@ -36,10 +34,7 @@ namespace oxen {
using error_code = boost::system::error_code;
using json = nlohmann::json;
using tcp = boost::asio::ip::tcp; // from <boost/asio.hpp>
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
using tcp = boost::asio::ip::tcp;
std::ostream& operator<<(std::ostream& os, const sn_response_t& res) {
switch (res.error_code) {
@ -60,14 +55,12 @@ std::ostream& operator<<(std::ostream& os, const sn_response_t& res) {
return os << "(" << (res.body ? *res.body : "n/a") << ")";
}
constexpr auto TEST_RETRY_PERIOD = std::chrono::milliseconds(50);
std::shared_ptr<request_t> build_post_request(
const ed25519_pubkey& host, const char* target, std::string data) {
auto req = std::make_shared<request_t>();
req->body() = std::move(data);
req->method(http::verb::post);
req->set(http::field::host,
req->method(bhttp::verb::post);
req->set(bhttp::field::host,
(host ? oxenmq::to_base32z(host.view()) : "service-node") + ".snode");
req->target(target);
req->prepare_payload();
@ -144,7 +137,7 @@ void oxend_json_rpc_request(boost::asio::io_context& ioc,
req_body["params"] = params;
req->body() = req_body.dump();
req->method(http::verb::post);
req->method(bhttp::verb::post);
req->target(target);
req->prepare_payload();
@ -153,793 +146,6 @@ void oxend_json_rpc_request(boost::asio::io_context& ioc,
make_http_request(ioc, daemon_ip, daemon_port, req, std::move(cb));
}
// =============================================================
namespace http_server {
// "Loop" forever accepting new connections.
static void accept_connection(boost::asio::io_context& ioc,
boost::asio::ssl::context& ssl_ctx,
tcp::acceptor& acceptor, ServiceNode& sn,
RequestHandler& rh, RateLimiter& rate_limiter,
const Security& security) {
static boost::asio::steady_timer acceptor_timer(ioc);
constexpr std::chrono::milliseconds ACCEPT_DELAY = 50ms;
acceptor.async_accept([&](const error_code& ec, tcp::socket socket) {
OXEN_LOG(trace, "connection accepted");
if (!ec) {
std::make_shared<connection_t>(ioc, ssl_ctx, std::move(socket), sn,
rh, rate_limiter, security)
->start();
accept_connection(ioc, ssl_ctx, acceptor, sn, rh, rate_limiter,
security);
} else {
// TODO: remove this once we confirmed that there is
// no more socket leaking
if (ec == boost::system::errc::too_many_files_open) {
OXEN_LOG(critical, "Too many open files, aborting");
abort();
}
OXEN_LOG(
error,
"Could not accept a new connection {}: {}. Will only start "
"accepting new connections after a short delay.",
ec.value(), ec.message());
// If we fail here we are unlikely to be able to accept a new
// connection immediately, hence the delay
acceptor_timer.expires_after(ACCEPT_DELAY);
acceptor_timer.async_wait([&](const error_code& ec) {
if (ec && ec != boost::asio::error::operation_aborted) {
// Not sure how to recover here, so it is probably the
// safest to simply abort and let the launcher/systemd
// restart us
abort();
}
accept_connection(ioc, ssl_ctx, acceptor, sn, rh, rate_limiter,
security);
});
}
});
}
void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,
const std::filesystem::path& base_path, ServiceNode& sn,
RequestHandler& rh, RateLimiter& rate_limiter, Security& security) {
OXEN_LOG(trace, "http server run");
const auto address =
boost::asio::ip::make_address(ip); /// throws if incorrect
tcp::acceptor acceptor{ioc, {address, port}};
ssl::context ssl_ctx{ssl::context::tlsv12};
load_server_certificate(base_path, ssl_ctx);
security.generate_cert_signature();
accept_connection(ioc, ssl_ctx, acceptor, sn, rh, rate_limiter, security);
ioc.run();
}
/// ============ connection_t ============
connection_t::connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
tcp::socket socket, ServiceNode& sn,
RequestHandler& rh, RateLimiter& rate_limiter,
const Security& security)
: ioc_(ioc), ssl_ctx_(ssl_ctx), socket_(std::move(socket)),
stream_(socket_, ssl_ctx_), security_(security), service_node_(sn),
request_handler_(rh), rate_limiter_(rate_limiter), repeat_timer_(ioc),
deadline_(ioc, SESSION_TIME_LIMIT), notification_ctx_{std::nullopt} {
static uint64_t instance_counter = 0;
conn_idx = instance_counter++;
get_net_stats().connections_in++;
OXEN_LOG(trace, "connection_t [{}]", conn_idx);
request_.body_limit(1024 * 1024 * 10); // 10 mb
start_timestamp_ = std::chrono::steady_clock::now();
}
connection_t::~connection_t() {
// Safety net
if (stream_.lowest_layer().is_open()) {
OXEN_LOG(debug, "Client socket should be closed by this point, but "
"wasn't. Closing now.");
stream_.lowest_layer().close();
}
get_net_stats().connections_in--;
OXEN_LOG(trace, "~connection_t [{}]", conn_idx);
}
void connection_t::start() {
register_deadline();
do_handshake();
}
void connection_t::do_handshake() {
// Perform the SSL handshake
stream_.async_handshake(ssl::stream_base::server,
std::bind(&connection_t::on_handshake,
shared_from_this(),
std::placeholders::_1));
}
void connection_t::on_handshake(boost::system::error_code ec) {
const auto sockfd = stream_.lowest_layer().native_handle();
OXEN_LOG(trace, "Open https socket: {}", sockfd);
get_net_stats().record_socket_open(sockfd);
if (ec) {
OXEN_LOG(debug, "ssl handshake failed: ec: {} ({})", ec.value(),
ec.message());
this->clean_up();
deadline_.cancel();
return;
}
this->read_request();
}
void connection_t::clean_up() { this->do_close(); }
void connection_t::notify(const message_t* msg) {
if (!notification_ctx_) {
OXEN_LOG(error,
"Trying to notify a connection without notification context");
return;
}
if (msg) {
OXEN_LOG(trace, "Processing message notification: {}", msg->data);
// save messages, so we can access them once the timer event happens
notification_ctx_->message = *msg;
}
// the timer callback will be called once we complete the current callback
notification_ctx_->timer.cancel();
}
// Asynchronously receive a complete request message.
void connection_t::read_request() {
auto on_data = [self = shared_from_this()](error_code ec,
size_t bytes_transferred) {
OXEN_LOG(trace, "on data: {} bytes", bytes_transferred);
if (ec) {
OXEN_LOG(
error,
"Failed to read from a socket [{}: {}], connection idx: {}",
ec.value(), ec.message(), self->conn_idx);
self->clean_up();
self->deadline_.cancel();
return;
}
// NOTE: this is blocking, we should make this asynchronous
try {
self->process_request();
} catch (const std::exception& e) {
OXEN_LOG(critical, "Exception caught processing a request: {}",
e.what());
self->body_stream_ << e.what();
}
if (!self->delay_response_) {
self->write_response();
}
};
http::async_read(stream_, buffer_, request_, on_data);
}
// Parse a pubkey string value as either base32z (deprecated!), b64, or hex. Returns a null pk
// (i.e. operator bool() returns false) and warns on invalid input.
static legacy_pubkey parse_pubkey(std::string_view public_key_in) {
legacy_pubkey pk{};
if (public_key_in.size() == 64 && oxenmq::is_hex(public_key_in))
oxenmq::from_hex(public_key_in.begin(), public_key_in.end(), pk.begin());
else if ((public_key_in.size() == 43 || (public_key_in.size() == 44 && public_key_in.back() == '='))
&& oxenmq::is_base64(public_key_in))
oxenmq::from_base64(public_key_in.begin(), public_key_in.end(), pk.begin());
else if (public_key_in.size() == 52 && oxenmq::is_base32z(public_key_in))
oxenmq::from_base32z(public_key_in.begin(), public_key_in.end(), pk.begin());
else {
OXEN_LOG(warn, "Invalid public key header: not hex, b64, or b32z encoded");
OXEN_LOG(debug, "Received public key encoded value: {}", public_key_in);
}
return pk;
}
bool connection_t::validate_snode_request() {
if (!parse_header(OXEN_SENDER_SNODE_PUBKEY_HEADER,
OXEN_SNODE_SIGNATURE_HEADER)) {
OXEN_LOG(debug, "Missing signature headers for a Service Node request");
return false;
}
const auto& signature_b64 = header_[OXEN_SNODE_SIGNATURE_HEADER];
legacy_pubkey public_key = parse_pubkey(header_[OXEN_SENDER_SNODE_PUBKEY_HEADER]);
if (!public_key)
return false;
signature sig;
try {
sig = signature::from_base64(signature_b64);
} catch (const std::exception&) {
OXEN_LOG(warn, "invalid signature (not base64) found in header from {}",
public_key);
return false;
}
/// Known service node
if (!service_node_.find_node(public_key)) {
body_stream_ << "Unknown service node\n";
OXEN_LOG(debug, "Discarding signature from unknown service node: {}",
public_key);
response_.result(http::status::unauthorized);
return false;
}
if (!check_signature(sig, hash_data(request_.get().body()), public_key)) {
constexpr auto msg = "Could not verify snode signature"sv;
OXEN_LOG(debug, "{}", msg);
body_stream_ << msg;
response_.result(http::status::unauthorized);
return false;
}
if (rate_limiter_.should_rate_limit(public_key)) {
this->body_stream_ << "Too many requests\n";
response_.result(http::status::too_many_requests);
return false;
}
return true;
}
void connection_t::process_storage_test_req(uint64_t height,
const legacy_pubkey& tester_pk,
const std::string& msg_hash) {
OXEN_LOG(trace, "Performing storage test, attempt: {}", repetition_count_);
std::string answer;
/// TODO: we never actually test that `height` is within any reasonable
/// time window (or that it is not repeated multiple times), we should do
/// that! This is done implicitly to some degree using
/// `block_hashes_cache_`, which holds a limited number of recent blocks
/// only and fails if an earlier block is requested
const MessageTestStatus status = service_node_.process_storage_test_req(
height, tester_pk, msg_hash, answer);
const auto elapsed_time =
std::chrono::steady_clock::now() - start_timestamp_;
if (status == MessageTestStatus::SUCCESS) {
OXEN_LOG(
debug, "Storage test success! Attempts: {}. Took {} ms",
repetition_count_,
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed_time)
.count());
delay_response_ = true;
nlohmann::json json_res;
json_res["status"] = "OK";
json_res["value"] = answer;
this->body_stream_ << json_res.dump();
response_.result(http::status::ok);
this->write_response();
} else if (status == MessageTestStatus::RETRY && elapsed_time < 1min) {
delay_response_ = true;
repetition_count_++;
repeat_timer_.expires_after(TEST_RETRY_PERIOD);
repeat_timer_.async_wait([self = shared_from_this(), height, msg_hash,
tester_pk](const error_code& ec) {
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
OXEN_LOG(error,
"Repeat timer failed for storage test [{}: {}]",
ec.value(), ec.message());
}
} else {
self->process_storage_test_req(height, tester_pk, msg_hash);
}
});
} else if (status == MessageTestStatus::WRONG_REQ) {
nlohmann::json json_res;
json_res["status"] = "wrong request";
this->body_stream_ << json_res.dump();
response_.result(http::status::ok);
} else {
// Promote this to `error` once we enforce storage testing
OXEN_LOG(debug, "Failed storage test, tried {} times.",
repetition_count_);
nlohmann::json json_res;
json_res["status"] = "other";
this->body_stream_ << json_res.dump();
response_.result(http::status::ok);
}
}
static x25519_pubkey extract_x25519_from_hex(std::string_view hex) {
try {
return x25519_pubkey::from_hex(hex);
} catch (const std::exception& e) {
OXEN_LOG(warn, "Failed to decode ephemeral key in onion request: {}", e.what());
throw;
}
}
void connection_t::process_onion_req_v2() {
OXEN_LOG(debug, "Processing an onion request from client (v2)");
const request_t& req = this->request_.get();
// Need to make sure we are not blocking waiting for the response
delay_response_ = true;
OnionRequestMetadata data{
x25519_pubkey{},
[wself = weak_from_this()](oxen::Response res) {
OXEN_LOG(debug, "Got an onion response as edge node");
auto self = wself.lock();
if (!self) {
OXEN_LOG(debug,
"Connection is no longer valid, dropping onion response");
return;
}
self->body_stream_ << res.message();
self->response_.result(static_cast<int>(res.status()));
self->write_response();
},
0, // hopno
EncryptType::aes_gcm,
};
try {
auto [ciphertext, json_req] = parse_combined_payload(req.body());
data.ephem_key = extract_x25519_from_hex(
json_req.at("ephemeral_key").get_ref<const std::string&>());
if (auto it = json_req.find("enc_type"); it != json_req.end())
data.enc_type = parse_enc_type(it->get_ref<const std::string&>());
// Otherwise stay at default aes-gcm
// Allows a fake starting hop number (to make it harder for intermediate hops to know where
// they are). If omitted, defaults to 0.
if (auto it = json_req.find("hop_no"); it != json_req.end())
data.hop_no = std::max(0, it->get<int>());
service_node_.record_onion_request();
request_handler_.process_onion_req(ciphertext, std::move(data));
} catch (const std::exception& e) {
auto msg = fmt::format("Error parsing onion request: {}",
e.what());
OXEN_LOG(error, "{}", msg);
response_.result(http::status::bad_request);
this->body_stream_ << std::move(msg);
this->write_response();
}
}
void connection_t::process_swarm_req(std::string_view target) {
const request_t& req = this->request_.get();
// allow ping request as a quick workaround (and they are cheap)
if (!validate_snode_request() && (target != "/swarms/ping_test/v1")) {
return;
}
response_.set(OXEN_SNODE_SIGNATURE_HEADER, security_.get_cert_signature());
if (target == "/swarms/storage_test/v1") {
/// Set to "bad request" by default
response_.result(http::status::bad_request);
OXEN_LOG(trace, "Got storage test request");
using nlohmann::json;
const json body = json::parse(req.body(), nullptr, false);
if (body == nlohmann::detail::value_t::discarded) {
OXEN_LOG(debug, "Bad snode test request: invalid json");
body_stream_ << "invalid json\n";
return;
}
uint64_t blk_height;
std::string msg_hash;
try {
blk_height = body.at("height").get<uint64_t>();
msg_hash = body.at("hash").get<std::string>();
} catch (...) {
this->body_stream_
<< "Bad snode test request: missing fields in json";
OXEN_LOG(debug, "Bad snode test request: missing fields in json");
return;
}
const auto it = header_.find(OXEN_SENDER_SNODE_PUBKEY_HEADER);
if (it == header_.end()) {
OXEN_LOG(debug, "Ignoring test request, no pubkey present");
return;
}
auto tester_pk = parse_pubkey(it->second);
if (!tester_pk) {
OXEN_LOG(debug, "Ignoring test request, invalid pubkey");
return;
}
process_storage_test_req(blk_height, tester_pk, msg_hash);
} else if (target == "/swarms/ping_test/v1") {
OXEN_LOG(trace, "Received ping_test");
service_node_.update_last_ping(ReachType::HTTPS);
response_.result(http::status::ok);
}
}
void connection_t::set_response(const Response& res) {
response_.result(static_cast<unsigned int>(res.status()));
std::string content_type;
switch (res.content_type()) {
case ContentType::plaintext:
content_type = "text/plain";
break;
case ContentType::json:
content_type = "application/json";
break;
default:
OXEN_LOG(critical, "Unrecognized content type");
}
response_.set(http::field::content_type, content_type);
body_stream_ << res.message();
}
// Determine what needs to be done with the request message.
void connection_t::process_request() {
const request_t& req = this->request_.get();
/// This method is responsible for filling out response_
OXEN_LOG(debug, "connection_t::process_request");
response_.version(req.version());
response_.keep_alive(false);
/// TODO: make sure that we always send a response!
response_.result(http::status::internal_server_error);
const boost::string_view target0 = req.target();
const std::string_view target =
std::string_view(target0.data(), target0.size());
OXEN_LOG(debug, "target: {}", target);
const bool is_swarm_req = (target.find("/swarms/") == 0);
if (is_swarm_req) {
OXEN_LOG(debug, "Processing a swarm request: {}", target);
}
switch (req.method()) {
case http::verb::post: {
std::string reason;
// Respond to ping even if we are not ready
if (target == "/swarms/ping_test/v1") {
this->process_swarm_req(target);
break;
}
if (!service_node_.snode_ready(&reason)) {
OXEN_LOG(debug,
"Ignoring post request; storage server not ready: {}",
reason);
OXEN_LOG(debug, "Would send 503 error (2)");
response_.result(http::status::service_unavailable);
body_stream_ << fmt::format("Service node is not ready: {}\n",
reason);
break;
}
if (target == "/storage_rpc/v1") {
/// Store/load from clients
OXEN_LOG(trace, "POST /storage_rpc/v1");
try {
process_client_req_rate_limited();
} catch (const std::exception& e) {
this->body_stream_ << fmt::format(
"Exception caught while processing client request: {}",
e.what());
response_.result(http::status::internal_server_error);
OXEN_LOG(critical,
"Exception caught while processing client request: {}",
e.what());
}
} else if (is_swarm_req) {
this->process_swarm_req(target);
} else if (target == "/onion_req/v2") {
this->process_onion_req_v2();
}
#ifdef INTEGRATION_TEST
else if (target == "/retrieve_all") {
const auto res = request_handler_.process_retrieve_all();
this->set_response(res);
} else if (target == "/quit") {
OXEN_LOG(info, "POST /quit");
// a bit of a hack: sending response manually
delay_response_ = true;
response_.result(http::status::ok);
write_response();
ioc_.stop();
} else if (target == "/sleep") {
ioc_.post([]() {
OXEN_LOG(warn, "Sleeping for some time...");
std::this_thread::sleep_for(std::chrono::seconds(30));
});
response_.result(http::status::ok);
}
#endif
else {
OXEN_LOG(debug, "unknown target for POST: {}", target);
this->body_stream_
<< fmt::format("unknown target for POST: {}", target);
response_.result(http::status::not_found);
}
break;
}
case http::verb::get:
if (target == "/get_stats/v1") {
this->on_get_stats();
} else {
this->body_stream_
<< fmt::format("unknown target for GET: {}", target);
OXEN_LOG(debug, "unknown target for GET: {}", target);
response_.result(http::status::not_found);
}
break;
default:
OXEN_LOG(debug, "bad request");
response_.result(http::status::bad_request);
break;
}
}
// Asynchronously transmit the response message.
void connection_t::write_response() {
OXEN_LOG(trace, "write response, {} bytes", response_.body().size());
const std::string body_stream = body_stream_.str();
if (!body_stream.empty()) {
if (!response_.body().empty()) {
OXEN_LOG(debug, "Overwritting non-empty body in response!");
}
response_.body() = body_stream_.str();
}
// Our last change to change the response before we start sending
if (this->response_modifier_) {
this->response_modifier_(response_);
}
response_.set(http::field::content_length,
std::to_string(response_.body().size()));
/// This attempts to write all data to a stream
/// TODO: handle the case when we are trying to send too much
http::async_write(
stream_, response_, [self = shared_from_this()](error_code ec, size_t) {
if (ec && ec != boost::asio::error::operation_aborted) {
OXEN_LOG(error, "Failed to write to a socket: {}",
ec.message());
}
self->clean_up();
/// Is it too early to cancel the deadline here?
self->deadline_.cancel();
});
}
bool connection_t::parse_header(const char* key) {
const auto it = request_.get().find(key);
if (it == request_.get().end()) {
body_stream_ << "Missing field in header : " << key << "\n";
return false;
}
header_[key] = it->value().to_string();
return true;
}
template <typename... Args>
bool connection_t::parse_header(const char* first, Args... args) {
return parse_header(first) && parse_header(args...);
}
/// Move this out of `connection_t` Process client request
/// Decouple responding from http
void connection_t::process_client_req_rate_limited() {
OXEN_LOG(trace, "process_client_req_rate_limited");
const request_t& req = this->request_.get();
std::string plain_text = req.body();
auto addr = socket_.remote_endpoint().address();
if (!addr.is_v4()) {
// We don't (currently?) support IPv6 at all (SS published IPs are only IPv4) so if we
// somehow get an IPv6 address then it isn't a proper SS request so just drop it.
response_.result(http::status::bad_request);
OXEN_LOG(warn, "incoming client request is not IPv4; dropping it");
return;
}
if (rate_limiter_.should_rate_limit_client(addr.to_v4().to_uint())) {
this->body_stream_ << "too many requests\n";
response_.result(http::status::too_many_requests);
OXEN_LOG(debug, "Rate limiting client request.");
return;
}
// Not sure what the original idea was to distinguish between headers
// in request_ and the actual header_ field, but it is useful for
// "proxy" client requests as we can have both true html headers
// and the headers that came encrypted in body
if (req.find(OXEN_LONG_POLL_HEADER) != req.end()) {
header_[OXEN_LONG_POLL_HEADER] =
req.at(OXEN_LONG_POLL_HEADER).to_string();
}
const bool lp_requested =
header_.find(OXEN_LONG_POLL_HEADER) != header_.end();
// Annoyingly, we might still have old clients that expect long-polling
// to work, spamming us with "retrieve" requests. The workaround for now
// is to delay responding to the request for a few seconds
// Client requests can be asynchronous, so only respond in a callback
this->delay_response_ = true;
// TODO: remove this when we remove long-polling from (most) clients
if (lp_requested) {
OXEN_LOG(debug, "Received a long-polling request");
auto delay_timer = std::make_shared<boost::asio::steady_timer>(ioc_);
delay_timer->expires_after(std::chrono::seconds(2));
delay_timer->async_wait([self = shared_from_this(), delay_timer,
plaintext = std::move(plain_text)](
const error_code& ec) {
self->request_handler_.process_client_req(
plaintext, [wself = std::weak_ptr<connection_t>{self}](
oxen::Response res) {
auto self = wself.lock();
if (!self) {
OXEN_LOG(
debug,
"Connection is no longer valid, dropping response");
return;
}
OXEN_LOG(debug, "Respond to a long-polling client");
self->set_response(res);
self->write_response();
});
});
} else {
request_handler_.process_client_req(
plain_text, [wself = weak_from_this()](oxen::Response res) {
// // A connection could have been destroyed by the deadline
// timer
auto self = wself.lock();
if (!self) {
OXEN_LOG(debug, "Connection is no longer valid, dropping "
"proxy response");
return;
}
OXEN_LOG(debug, "Respond to a non-long polling client");
self->set_response(res);
self->write_response();
});
}
}
void connection_t::register_deadline() {
// Note: deadline callback captures a shared pointer to this, so
// the connection will not be destroyed until the timer goes off.
// If we want to destroy it earlier, we need to manually cancel the timer.
deadline_.async_wait([self = shared_from_this()](error_code ec) {
const bool cancelled =
(ec && ec == boost::asio::error::operation_aborted);
if (cancelled)
return;
// Note: cancelled timer does absolutely nothing, so we need to make
// sure we close the socket (and unsubscribe from notifications)
// elsewhere if we cancel it.
if (ec) {
OXEN_LOG(error, "Deadline timer error [{}]: {}", ec.value(),
ec.message());
}
OXEN_LOG(debug, "[{}] Closing [connection_t] socket due to timeout",
self->conn_idx);
self->clean_up();
});
}
void connection_t::do_close() {
// Perform the SSL shutdown
stream_.async_shutdown(std::bind(
&connection_t::on_shutdown, shared_from_this(), std::placeholders::_1));
}
void connection_t::on_shutdown(boost::system::error_code ec) {
if (ec == boost::asio::error::eof) {
// Rationale:
// http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
ec.assign(0, ec.category());
} else if (ec) {
OXEN_LOG(debug, "Could not close ssl stream gracefully, ec: {} ({})",
ec.message(), ec.value());
}
const auto sockfd = stream_.lowest_layer().native_handle();
OXEN_LOG(trace, "Close https socket: {}", sockfd);
get_net_stats().record_socket_close(sockfd);
stream_.lowest_layer().close();
}
void connection_t::on_get_stats() {
this->body_stream_ << service_node_.get_stats_for_session_client();
this->response_.result(http::status::ok);
}
/// ============
} // namespace http_server
/// TODO: make generic, avoid message copy
HttpClientSession::HttpClientSession(boost::asio::io_context& ioc,
const tcp::endpoint& ep,
@ -947,15 +153,13 @@ HttpClientSession::HttpClientSession(boost::asio::io_context& ioc,
http_callback_t&& cb)
: ioc_(ioc), socket_(ioc), endpoint_(ep), callback_(cb),
deadline_timer_(ioc), req_(req) {
get_net_stats().http_connections_out++;
}
void HttpClientSession::on_connect() {
const auto sockfd = socket_.native_handle();
OXEN_LOG(trace, "Open http socket: {}", sockfd);
get_net_stats().record_socket_open(sockfd);
http::async_write(socket_, *req_,
bhttp::async_write(socket_, *req_,
std::bind(&HttpClientSession::on_write,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2));
@ -974,19 +178,19 @@ void HttpClientSession::on_write(error_code ec, size_t bytes_transferred) {
OXEN_LOG(trace, "Successfully transferred {} bytes", bytes_transferred);
// Receive the HTTP response
http::async_read(socket_, buffer_, res_,
bhttp::async_read(socket_, buffer_, res_,
std::bind(&HttpClientSession::on_read, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
}
void HttpClientSession::on_read(error_code ec, size_t bytes_transferred) {
if (!ec || (ec == http::error::end_of_stream)) {
if (!ec || (ec == bhttp::error::end_of_stream)) {
OXEN_LOG(trace, "Successfully received {} bytes.", bytes_transferred);
if (http::to_status_class(res_.result_int()) ==
http::status_class::successful) {
if (bhttp::to_status_class(res_.result_int()) ==
bhttp::status_class::successful) {
std::shared_ptr<std::string> body =
std::make_shared<std::string>(res_.body());
trigger_callback(SNodeError::NO_ERROR, std::move(body));
@ -1097,7 +301,6 @@ void HttpClientSession::clean_up() {
ec.message());
} else {
OXEN_LOG(trace, "Close http socket: {}", sockfd);
get_net_stats().record_socket_close(sockfd);
}
}
@ -1111,8 +314,6 @@ HttpClientSession::~HttpClientSession() {
sn_response_t{SNodeError::ERROR_OTHER, nullptr}));
}
get_net_stats().http_connections_out--;
this->clean_up();
}

View file

@ -22,21 +22,15 @@
namespace oxen {
inline constexpr auto OXEN_SENDER_SNODE_PUBKEY_HEADER = "X-Loki-Snode-PubKey";
inline constexpr auto OXEN_SNODE_SIGNATURE_HEADER = "X-Loki-Snode-Signature";
inline constexpr auto OXEN_SENDER_KEY_HEADER = "X-Sender-Public-Key";
inline constexpr auto OXEN_TARGET_SNODE_KEY = "X-Target-Snode-Key";
inline constexpr auto OXEN_LONG_POLL_HEADER = "X-Loki-Long-Poll";
inline constexpr auto SESSION_TIME_LIMIT = 60s;
class RateLimiter;
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
namespace bhttp = boost::beast::http;
namespace bssl = boost::asio::ssl;
using request_t = http::request<http::string_body>;
using response_t = http::response<http::string_body>;
using request_t = bhttp::request<bhttp::string_body>;
using response_t = bhttp::response<bhttp::string_body>;
std::shared_ptr<request_t> build_post_request(
const ed25519_pubkey& host, const char* target, std::string data);
@ -116,144 +110,6 @@ class HttpClientSession
~HttpClientSession();
};
namespace http_server {
class connection_t : public std::enable_shared_from_this<connection_t> {
using tcp = boost::asio::ip::tcp;
private:
boost::asio::io_context& ioc_;
ssl::context& ssl_ctx_;
// The socket for the currently connected client.
tcp::socket socket_;
// The buffer for performing reads.
boost::beast::flat_buffer buffer_{8192};
ssl::stream<tcp::socket&> stream_;
const Security& security_;
// Contains the request message
http::request_parser<http::string_body> request_;
// The response message.
response_t response_;
// whether the response should be sent asyncronously,
// as opposed to directly after connection_t::process_request
bool delay_response_ = false;
// TODO: remove SN, only use Reqeust Handler as a mediator
ServiceNode& service_node_;
RequestHandler& request_handler_;
RateLimiter& rate_limiter_;
// The timer for repeating an action within one connection
boost::asio::steady_timer repeat_timer_;
int repetition_count_ = 0;
std::chrono::time_point<std::chrono::steady_clock> start_timestamp_;
// The timer for putting a deadline on connection processing.
boost::asio::steady_timer deadline_;
/// TODO: move these if possible
std::map<std::string, std::string> header_;
std::stringstream body_stream_;
// Note that we are only sending a single message through the
// notification mechanism. If we somehow accumulated multiple
// messages before notification event happens (unlikely), the
// following messages will be delivered with the client's
// consequent (and immediate) retrieve request
struct notification_context_t {
// The timer used for internal db polling
boost::asio::steady_timer timer;
// the message is stored here momentarily; needed because
// we can't pass it using current notification mechanism
std::optional<message_t> message;
// Messenger public key that this connection is registered for
std::string pubkey;
};
std::optional<notification_context_t> notification_ctx_;
// If present, this function will be called just before
// writing the response
std::function<void(response_t&)> response_modifier_;
public:
connection_t(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
tcp::socket socket, ServiceNode& sn, RequestHandler& rh,
RateLimiter& rate_limiter, const Security& security);
~connection_t();
// Connection index, mainly used for debugging
uint64_t conn_idx;
/// Initiate the asynchronous operations associated with the connection.
void start();
void notify(const message_t* msg);
private:
void do_handshake();
void on_handshake(boost::system::error_code ec);
/// Asynchronously receive a complete request message.
void read_request();
void do_close();
void on_shutdown(boost::system::error_code ec);
/// process GET /get_stats/v1
void on_get_stats();
/// Determine what needs to be done with the request message
/// (synchronously).
void process_request();
/// Unsubscribe listener (if any) and shutdown the connection
void clean_up();
/// Asynchronously transmit the response message.
void write_response();
/// Syncronously (?) process client store/load requests
void process_client_req_rate_limited();
void process_swarm_req(std::string_view target);
/// Process onion request from the client
void process_onion_req_v2();
// Check whether we have spent enough time on this connection.
void register_deadline();
/// Process storage test request and repeat if necessary
void process_storage_test_req(uint64_t height,
const legacy_pubkey& tester_addr,
const std::string& msg_hash);
void set_response(const Response& res);
bool parse_header(const char* key);
template <typename... Args>
bool parse_header(const char* first, Args... args);
bool validate_snode_request();
};
void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,
const std::filesystem::path& base_path, ServiceNode& sn,
RequestHandler& rh, RateLimiter& rate_limiter, Security&);
} // namespace http_server
constexpr const char* error_string(SNodeError err) {
switch (err) {
case oxen::SNodeError::NO_ERROR:

View file

@ -1,8 +1,8 @@
#include "https_client.h"
#include "net_stats.h"
#include "oxen_logger.h"
#include "signature.h"
#include "sn_record.h"
#include "request_handler.h"
#include <openssl/x509.h>
#include <oxenmq/base64.h>
@ -12,7 +12,7 @@ namespace oxen {
using error_code = boost::system::error_code;
static ssl::context ctx{ssl::context::tlsv12_client};
static bssl::context ctx{bssl::context::tlsv12_client};
void make_https_request_to_sn(
boost::asio::io_context& ioc,
@ -79,7 +79,7 @@ void make_https_request(boost::asio::io_context& ioc, const std::string& host,
return;
}
static ssl::context ctx{ssl::context::tlsv12_client};
static bssl::context ctx{bssl::context::tlsv12_client};
auto session = std::make_shared<HttpsClientSession>(
ioc, ctx, std::move(resolve_results), host.c_str(), std::move(req),
@ -114,7 +114,7 @@ static std::string x509_to_string(X509* x509) {
}
HttpsClientSession::HttpsClientSession(
boost::asio::io_context& ioc, ssl::context& ssl_ctx,
boost::asio::io_context& ioc, bssl::context& ssl_ctx,
tcp::resolver::results_type resolve_results, const char* host,
std::shared_ptr<request_t> req, http_callback_t&& cb,
std::optional<legacy_pubkey> sn_pubkey)
@ -122,8 +122,6 @@ HttpsClientSession::HttpsClientSession(
callback_(cb), deadline_timer_(ioc), stream_(ioc, ssl_ctx_),
req_(std::move(req)), server_pubkey_(std::move(sn_pubkey)) {
get_net_stats().https_connections_out++;
response_.body_limit(1024 * 1024 * 10); // 10 mb
// Set SNI Hostname (many hosts need this to handshake successfully)
@ -182,12 +180,11 @@ void HttpsClientSession::on_connect() {
const auto sockfd = stream_.lowest_layer().native_handle();
OXEN_LOG(trace, "Open https client socket: {}", sockfd);
get_net_stats().record_socket_open(sockfd);
stream_.set_verify_mode(ssl::verify_none);
stream_.set_verify_mode(bssl::verify_none);
stream_.set_verify_callback(
[this](bool preverified, ssl::verify_context& ctx) -> bool {
[this](bool preverified, bssl::verify_context& ctx) -> bool {
if (!preverified) {
X509_STORE_CTX* handle = ctx.native_handle();
X509* x509 = X509_STORE_CTX_get0_cert(handle);
@ -195,7 +192,7 @@ void HttpsClientSession::on_connect() {
}
return true;
});
stream_.async_handshake(ssl::stream_base::client,
stream_.async_handshake(bssl::stream_base::client,
std::bind(&HttpsClientSession::on_handshake,
shared_from_this(),
std::placeholders::_1));
@ -209,7 +206,7 @@ void HttpsClientSession::on_handshake(boost::system::error_code ec) {
return;
}
http::async_write(stream_, *req_,
bhttp::async_write(stream_, *req_,
std::bind(&HttpsClientSession::on_write,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2));
@ -228,7 +225,7 @@ void HttpsClientSession::on_write(error_code ec, size_t bytes_transferred) {
OXEN_LOG(trace, "Successfully transferred {} bytes.", bytes_transferred);
// Receive the HTTP response
http::async_read(stream_, buffer_, response_,
bhttp::async_read(stream_, buffer_, response_,
std::bind(&HttpsClientSession::on_read, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
}
@ -240,7 +237,7 @@ bool HttpsClientSession::verify_signature() {
const auto& response = response_.get();
const auto it = response.find(OXEN_SNODE_SIGNATURE_HEADER);
const auto it = response.find(http::SNODE_SIGNATURE_HEADER);
if (it == response.end()) {
OXEN_LOG(warn, "no signature found in header from {}",
*server_pubkey_);
@ -264,10 +261,10 @@ void HttpsClientSession::on_read(error_code ec, size_t bytes_transferred) {
const auto& response = response_.get();
if (!ec || (ec == http::error::end_of_stream)) {
if (!ec || (ec == bhttp::error::end_of_stream)) {
if (http::to_status_class(response.result_int()) ==
http::status_class::successful) {
if (bhttp::to_status_class(response.result_int()) ==
bhttp::status_class::successful) {
if (server_pubkey_ && !verify_signature()) {
OXEN_LOG(debug, "Bad signature from {}", *server_pubkey_);
@ -339,7 +336,6 @@ void HttpsClientSession::on_shutdown(boost::system::error_code ec) {
const auto sockfd = stream_.lowest_layer().native_handle();
OXEN_LOG(trace, "Close https socket: {}", sockfd);
get_net_stats().record_socket_close(sockfd);
stream_.lowest_layer().close();
@ -355,7 +351,5 @@ HttpsClientSession::~HttpsClientSession() {
ioc_.post(std::bind(callback_,
sn_response_t{SNodeError::ERROR_OTHER, nullptr}));
}
get_net_stats().https_connections_out--;
}
} // namespace oxen

View file

@ -25,7 +25,7 @@ class HttpsClientSession
using tcp = boost::asio::ip::tcp;
boost::asio::io_context& ioc_;
ssl::context& ssl_ctx_;
bssl::context& ssl_ctx_;
tcp::resolver::results_type resolve_results_;
http_callback_t callback_;
boost::asio::steady_timer deadline_timer_;
@ -33,14 +33,14 @@ class HttpsClientSession
// keep the cert in memory for post-handshake verification
std::string server_cert_;
ssl::stream<tcp::socket> stream_;
bssl::stream<tcp::socket> stream_;
boost::beast::flat_buffer buffer_;
/// NOTE: this needs to be a shared pointer since
/// it is very common for the same request to be
/// sent to multiple snodes
std::shared_ptr<request_t> req_;
http::response_parser<http::string_body> response_;
bhttp::response_parser<bhttp::string_body> response_;
// Snode's pub key (none if signature verification is not used / not a
// snode)
@ -66,7 +66,7 @@ class HttpsClientSession
public:
// Resolver and socket require an io_context
HttpsClientSession(boost::asio::io_context& ioc, ssl::context& ssl_ctx,
HttpsClientSession(boost::asio::io_context& ioc, bssl::context& ssl_ctx,
tcp::resolver::results_type resolve_results,
const char* host, std::shared_ptr<request_t> req,
http_callback_t&& cb,

View file

@ -1,6 +1,7 @@
#include "lmq_server.h"
#include "dev_sink.h"
#include "http.h"
#include "oxen_common.h"
#include "oxen_logger.h"
#include "oxend_key.h"
@ -78,24 +79,23 @@ void OxenmqServer::handle_sn_proxy_exit(oxenmq::Message& message) {
}
auto client_key = extract_x25519_from_hex(message.data[0]);
// TODO: Just not returning here is gross: the protocol needs some way to return an error state,
// but doesn't currently have one.
// TODO: Just not returning any response here is gross: the protocol needs some way to return an
// error state, but doesn't currently have one.
if (!client_key) return;
const auto& payload = message.data[1];
request_handler_->process_proxy_exit(
*client_key, payload,
[send=message.send_later()](oxen::Response res) {
OXEN_LOG(debug, " Proxy exit status: {}", res.status());
OXEN_LOG(debug, " Proxy exit status: {}", res.status.first);
if (res.status() == Status::OK) {
send.reply(res.message());
if (res.status == http::OK) {
send.reply(res.body);
} else {
// We reply with 2 message parts which will be treated as
// an error (rather than timeout)
send.reply(fmt::format("{}", res.status()), res.message());
OXEN_LOG(debug, "Error: status is not OK for proxy_exit: {}",
res.status());
send.reply(std::to_string(res.status.first), res.body);
OXEN_LOG(debug, "Error: status {} != OK for proxy_exit", res.status.first);
}
});
}
@ -115,13 +115,11 @@ void OxenmqServer::handle_onion_request(
if (OXEN_LOG_ENABLED(trace))
OXEN_LOG(trace, "on response: {}...", to_string(res).substr(0, 100));
send.reply(
std::to_string(static_cast<int>(res.status())),
std::move(res).message());
send.reply(std::to_string(res.status.first), std::move(res).body);
};
if (data.hop_no > MAX_ONION_HOPS)
return data.cb({Status::BAD_REQUEST, "onion request max path length exceeded"});
return data.cb({http::BAD_REQUEST, "onion request max path length exceeded"});
request_handler_->process_onion_req(payload, std::move(data));
}
@ -136,8 +134,7 @@ void OxenmqServer::handle_onion_request(oxenmq::Message& message) {
} catch (const std::exception& e) {
auto msg = "Invalid internal onion request: "s + e.what();
OXEN_LOG(error, "{}", msg);
message.send_reply(
std::to_string(static_cast<int>(Status::BAD_REQUEST)), msg);
message.send_reply(std::to_string(http::BAD_REQUEST.first), msg);
return;
}
@ -148,7 +145,7 @@ void OxenmqServer::handle_onion_req_v2(oxenmq::Message& message) {
OXEN_LOG(debug, "Got a v2 onion request over OxenMQ");
const int bad_code = static_cast<int>(Status::BAD_REQUEST);
constexpr int bad_code = http::BAD_REQUEST.first;
if (message.data.size() != 2) {
OXEN_LOG(error, "Expected 2 message parts, got {}",
message.data.size());
@ -250,7 +247,7 @@ OxenmqServer::OxenmqServer(
if (m.data.size() == 1 && m.data[0] == "ping"sv)
return handle_ping(m);
m.send_reply(
std::to_string(static_cast<int>(Status::BAD_REQUEST)),
std::to_string(http::BAD_REQUEST.first),
"onion requests v1 not supported");
})
// TODO: Backwards compat, only used up until HF18

View file

@ -1,11 +1,11 @@
#include "channel_encryption.hpp"
#include "command_line.h"
#include "http_connection.h"
#include "https_server.h"
#include "oxen_logger.h"
#include "oxend_key.h"
#include "oxend_rpc.h"
#include "rate_limiter.h"
#include "security.h"
#include "server_certificates.h"
#include "service_node.h"
#include "swarm.h"
#include "utils.hpp"
@ -34,8 +34,16 @@ extern "C" {
namespace fs = std::filesystem;
std::atomic<int> signalled = 0;
extern "C" void handle_signal(int sig) {
signalled = sig;
}
int main(int argc, char* argv[]) {
std::signal(SIGINT, handle_signal);
std::signal(SIGTERM, handle_signal);
oxen::command_line_parser parser;
try {
@ -112,8 +120,6 @@ int main(int argc, char* argv[]) {
OXEN_LOG(info, "OxenMQ is listening at {}:{}", options.ip,
options.lmq_port);
boost::asio::io_context ioc{1};
if (sodium_init() != 0) {
OXEN_LOG(error, "Could not initialize libsodium");
return EXIT_FAILURE;
@ -169,22 +175,37 @@ int main(int argc, char* argv[]) {
ChannelEncryption channel_encryption{private_key_x25519, me.pubkey_x25519};
auto ssl_cert = data_dir / "cert.pem";
auto ssl_key = data_dir / "key.pem";
auto ssl_dh = data_dir / "dh.pem";
if (!exists(ssl_cert) || !exists(ssl_key))
generate_cert(ssl_cert, ssl_key);
if (!exists(ssl_dh))
generate_dh_pem(ssl_dh);
// Set up oxenmq now, but don't actually start it until after we set up the ServiceNode
// instance (because ServiceNode and OxenmqServer reference each other).
OxenmqServer oxenmq_server{me, private_key_x25519, stats_access_keys};
auto oxenmq_server_ptr = std::make_unique<OxenmqServer>(me, private_key_x25519, stats_access_keys);
auto& oxenmq_server = *oxenmq_server_ptr;
// TODO: SN doesn't need oxenmq_server, just the lmq components
ServiceNode service_node(ioc, me, private_key, oxenmq_server,
data_dir, options.force_start);
// Add a category for handling incoming https requests
auto https_threads = std::max<unsigned>(std::thread::hardware_concurrency(), 3);
oxenmq_server->add_category("https",
oxenmq::AuthLevel::basic, https_threads, 1000 /* max queued requests */);
ServiceNode service_node{
me, private_key, oxenmq_server, data_dir, options.force_start};
RequestHandler request_handler{service_node, channel_encryption};
HTTPSServer https_server{service_node, request_handler, {{options.ip, options.port, true}},
ssl_cert, ssl_key, ssl_dh, {me.pubkey_legacy, private_key}};
RequestHandler request_handler(ioc, service_node, channel_encryption);
oxenmq_server.init(&service_node, &request_handler,
oxenmq::address{options.oxend_omq_rpc});
RateLimiter rate_limiter;
Security security(legacy_keypair{me.pubkey_legacy, private_key}, data_dir);
https_server.start();
#ifdef ENABLE_SYSTEMD
sd_notify(0, "READY=1");
@ -193,9 +214,16 @@ int main(int argc, char* argv[]) {
}, 10s);
#endif
http_server::run(ioc, options.ip, options.port, data_dir,
service_node, request_handler, rate_limiter,
security);
while (signalled.load() == 0)
std::this_thread::sleep_for(100ms);
OXEN_LOG(warn, "Received signal {}; shutting down...", signalled.load());
service_node.shutdown();
OXEN_LOG(info, "Stopping https server");
https_server.shutdown(true);
OXEN_LOG(info, "Stopping omq server");
oxenmq_server_ptr.reset();
OXEN_LOG(info, "Shutting down");
} catch (const std::exception& e) {
// It seems possible for logging to throw its own exception,
// in which case it will be propagated to libc...

View file

@ -1,36 +0,0 @@
#pragma once
#include "oxen_logger.h"
#include <set>
struct net_stats_t {
std::atomic<uint32_t> connections_in{0};
std::atomic<uint32_t> http_connections_out{0};
std::atomic<uint32_t> https_connections_out{0};
std::set<int> open_fds;
void record_socket_open(int sockfd) {
#ifdef INTEGRATION_TEST
if (open_fds.find(sockfd) != open_fds.end()) {
OXEN_LOG(critical, "Already recorded as open: {}!", sockfd);
}
open_fds.insert(sockfd);
#endif
}
void record_socket_close(int sockfd) {
#ifdef INTEGRATION_TEST
if (open_fds.find(sockfd) == open_fds.end()) {
OXEN_LOG(critical, "Socket is NOT recorded as open: {}", sockfd);
}
open_fds.erase(sockfd);
#endif
}
};
inline net_stats_t& get_net_stats() {
static net_stats_t stats;
return stats;
}

View file

@ -1,4 +1,5 @@
#include "channel_encryption.hpp"
#include "http.h"
#include "oxen_logger.h"
#include "request_handler.h"
#include "service_node.h"
@ -10,8 +11,8 @@
#include "onion_processing.h"
#include "utils.hpp"
#include "string_utils.hpp"
#include <charconv>
#include <variant>
using nlohmann::json;
@ -93,53 +94,13 @@ process_ciphertext_v2(const ChannelEncryption& decryptor,
return process_inner_request(std::move(*plaintext));
}
static auto make_status(std::string_view status) -> oxen::Status {
int code;
auto res =
std::from_chars(status.data(), status.data() + status.size(), code);
if (res.ec == std::errc::invalid_argument ||
res.ec == std::errc::result_out_of_range) {
return Status::INTERNAL_SERVER_ERROR;
}
switch (code) {
case 200:
return Status::OK;
case 400:
return Status::BAD_REQUEST;
case 403:
return Status::FORBIDDEN;
case 406:
return Status::NOT_ACCEPTABLE;
case 421:
return Status::MISDIRECTED_REQUEST;
case 432:
return Status::INVALID_POW;
case 500:
return Status::INTERNAL_SERVER_ERROR;
case 502:
return Status::BAD_GATEWAY;
case 503:
return Status::SERVICE_UNAVAILABLE;
case 504:
return Status::GATEWAY_TIMEOUT;
default:
return Status::INTERNAL_SERVER_ERROR;
}
}
// FIXME: why are these method definitions *here* instead of request_handler.cpp?
void RequestHandler::process_onion_req(std::string_view ciphertext,
OnionRequestMetadata data) {
if (!service_node_.snode_ready()) {
auto msg =
fmt::format("Snode not ready: {}",
service_node_.own_address().pubkey_ed25519);
return data.cb({Status::SERVICE_UNAVAILABLE, std::move(msg)});
}
if (!service_node_.snode_ready())
return data.cb({
http::SERVICE_UNAVAILABLE,
fmt::format("Snode not ready: {}", service_node_.own_address().pubkey_ed25519)});
OXEN_LOG(debug, "process_onion_req");
@ -167,31 +128,32 @@ void RequestHandler::process_onion_req(RelayToNodeInfo&& info,
if (!dest_node) {
auto msg = fmt::format("Next node not found: {}", dest);
OXEN_LOG(warn, "{}", msg);
return data.cb({Status::BAD_GATEWAY, std::move(msg)});
return data.cb({http::BAD_GATEWAY, std::move(msg)});
}
auto on_response = [cb=std::move(data.cb)](bool success,
std::vector<std::string> data) {
auto on_response = [cb=std::move(data.cb)](bool success, std::vector<std::string> data) {
// Processing the result we got from upstream
if (!success) {
OXEN_LOG(debug, "[Onion request] Request time out");
return cb({Status::GATEWAY_TIMEOUT, "Request time out"});
return cb({http::GATEWAY_TIMEOUT, "Request time out"});
}
// We only expect a two-part message
if (data.size() != 2) {
OXEN_LOG(debug, "[Onion request] Incorrect number of messages: {}",
data.size());
return cb({Status::INTERNAL_SERVER_ERROR,
"Incorrect number of messages from gateway"});
// We expect a two-part message, but for forwards compatibility allow extra parts
if (data.size() < 2) {
OXEN_LOG(debug, "[Onion request] Invalid response; expected at least 2 parts");
return cb({http::INTERNAL_SERVER_ERROR, "Invalid response from snode"});
}
Response res{http::INTERNAL_SERVER_ERROR, std::move(data[1]), http::json};
if (int code; util::parse_int(data[0], code))
res.status = http::from_code(code);
/// We use http status codes (for now)
if (data[0] != "200")
OXEN_LOG(debug, "Onion request relay failed with: {}", data[1]);
if (res.status != http::OK)
OXEN_LOG(debug, "Onion request relay failed with: {}", res.body);
cb({make_status(data[0]), std::move(data[1])});
cb(std::move(res));
};
OXEN_LOG(debug, "send_onion_to_sn, sn: {}", dest_node->pubkey_legacy);
@ -203,14 +165,14 @@ void RequestHandler::process_onion_req(RelayToNodeInfo&& info,
}
bool is_server_url_allowed(std::string_view url) {
return (util::starts_with(url, "/loki/") ||
util::starts_with(url, "/oxen/")) &&
util::ends_with(url, "/lsrpc") &&
(url.find('?') == std::string::npos);
return
(util::starts_with(url, "/loki/") || util::starts_with(url, "/oxen/")) &&
util::ends_with(url, "/lsrpc") &&
url.find('?') == std::string::npos;
}
void RequestHandler::process_onion_req(RelayToServerInfo&& info,
OnionRequestMetadata&& data) {
void RequestHandler::process_onion_req(
RelayToServerInfo&& info, OnionRequestMetadata&& data) {
OXEN_LOG(debug, "We are to forward the request to url: {}{}",
info.host, info.target);
@ -219,7 +181,7 @@ void RequestHandler::process_onion_req(RelayToServerInfo&& info,
return process_onion_to_url(info.protocol, std::move(info.host), info.port,
std::move(info.target), std::move(info.payload), std::move(data.cb));
return data.cb(wrap_proxy_response({Status::BAD_REQUEST, "Invalid url"},
return data.cb(wrap_proxy_response({http::BAD_REQUEST, "Invalid url"},
data.ephem_key, data.enc_type));
}
@ -228,11 +190,9 @@ void RequestHandler::process_onion_req(ProcessCiphertextError&& error,
switch (error) {
case ProcessCiphertextError::INVALID_CIPHERTEXT:
// Should this error be propagated back to the client? (No, if we
// couldn't decrypt, we probably won't be able to encrypt either.)
return data.cb({Status::BAD_REQUEST, "Invalid ciphertext"});
return data.cb({http::BAD_REQUEST, "Invalid ciphertext"});
case ProcessCiphertextError::INVALID_JSON:
return data.cb(wrap_proxy_response({Status::BAD_REQUEST, "Invalid json"},
return data.cb(wrap_proxy_response({http::BAD_REQUEST, "Invalid json"},
data.ephem_key, data.enc_type));
}
}
@ -250,14 +210,13 @@ auto parse_combined_payload(std::string_view payload) -> CiphertextPlusJson {
}
uint32_t n;
std::memcpy(&n, payload.data(), sizeof(uint32_t));
std::memcpy(&n, payload.data(), 4);
payload.remove_prefix(4);
boost::endian::little_to_native_inplace(n);
OXEN_LOG(trace, "Ciphertext length: {}", n);
payload.remove_prefix(sizeof(uint32_t));
if (payload.size() < n) {
auto msg = fmt::format("Unexpected payload size {}, expected {}", payload.size(), n);
auto msg = fmt::format("Unexpected payload size {}, expected >= {}", payload.size(), n);
OXEN_LOG(warn, "{}", msg);
throw std::runtime_error{msg};
}

View file

@ -3,6 +3,7 @@
#include "http_connection.h"
#include "lmq_server.h"
#include "oxen_logger.h"
#include "signature.h"
#include "service_node.h"
#include "utils.hpp"
@ -18,27 +19,20 @@ using nlohmann::json;
namespace oxen {
constexpr size_t MAX_MESSAGE_BODY = 102400; // 100 KB limit
std::string to_string(const Response& res) {
std::stringstream ss;
ss << "Status: " << static_cast<int>(res.status()) << ", ";
ss << "ContentType: "
<< ((res.content_type() == ContentType::plaintext) ? "plaintext"
: "json")
<< ", ";
ss << "Body: <" << res.message() << ">";
ss << "Status: " << res.status.first << " " << res.status.second
<< ", Content-Type: " << (res.content_type.empty() ? "(unspecified)" : res.content_type)
<< ", Body: <" << res.body << ">";
return ss.str();
}
RequestHandler::RequestHandler(boost::asio::io_context& ioc, ServiceNode& sn,
const ChannelEncryption& ce)
: ioc_(ioc), service_node_(sn), channel_cipher_(ce) {}
namespace {
static json snodes_to_json(const std::vector<sn_record_t>& snodes) {
json snodes_to_json(const std::vector<sn_record_t>& snodes) {
json res_body;
json snodes_json = json::array();
@ -53,12 +47,12 @@ static json snodes_to_json(const std::vector<sn_record_t>& snodes) {
{"ip", sn.ip}});
}
res_body["snodes"] = snodes_json;
res_body["snodes"] = std::move(snodes_json);
return res_body;
}
static std::string obfuscate_pubkey(std::string_view pk) {
std::string obfuscate_pubkey(std::string_view pk) {
std::string res;
res += pk.substr(0, 2);
res += "...";
@ -66,43 +60,45 @@ static std::string obfuscate_pubkey(std::string_view pk) {
return res;
}
/// TODO: this probably shouldn't return Response...
Response RequestHandler::handle_wrong_swarm(const user_pubkey_t& pubKey) {
std::string computeMessageHash(std::vector<std::string_view> parts, bool hex) {
SHA512_CTX ctx;
SHA512_Init(&ctx);
for (const auto& s : parts)
SHA512_Update(&ctx, s.data(), s.size());
const std::vector<sn_record_t> nodes =
service_node_.get_snodes_by_pk(pubKey);
const json res_body = snodes_to_json(nodes);
std::string hashResult;
hashResult.resize(SHA512_DIGEST_LENGTH);
SHA512_Final(reinterpret_cast<unsigned char*>(hashResult.data()), &ctx);
if (hex)
hashResult = oxenmq::to_hex(hashResult);
return hashResult;
}
} // anon. namespace
RequestHandler::RequestHandler(
ServiceNode& sn,
const ChannelEncryption& ce)
: service_node_{sn}, channel_cipher_(ce) {}
Response RequestHandler::handle_wrong_swarm(const user_pubkey_t& pubKey) {
OXEN_LOG(trace, "Got client request to a wrong swarm");
return Response{Status::MISDIRECTED_REQUEST, res_body.dump(),
ContentType::json};
}
std::string computeMessageHash(const std::string& timestamp,
const std::string& ttl,
const std::string& recipient,
const std::string& data) {
SHA512_CTX ctx;
SHA512_Init(&ctx);
for (const auto* s : {&timestamp, &ttl, &recipient, &data})
SHA512_Update(&ctx, s->data(), s->size());
unsigned char hashResult[SHA512_DIGEST_LENGTH];
SHA512_Final(hashResult, &ctx);
return oxenmq::to_hex(std::begin(hashResult), std::end(hashResult));
return {
http::MISDIRECTED_REQUEST,
snodes_to_json(service_node_.get_snodes_by_pk(pubKey)).dump(),
http::json};
}
Response RequestHandler::process_store(const json& params) {
constexpr const char* fields[] = {"pubKey", "ttl", "timestamp", "data"};
for (const auto& field : fields) {
for (const auto& field : {"pubKey", "ttl", "timestamp", "data"}) {
if (!params.contains(field)) {
OXEN_LOG(debug, "Bad client request: no `{}` field", field);
return Response{
Status::BAD_REQUEST,
return {
http::BAD_REQUEST,
fmt::format("invalid json: no `{}` field\n", field)};
}
}
@ -122,7 +118,7 @@ Response RequestHandler::process_store(const json& params) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
return {http::BAD_REQUEST, std::move(msg)};
}
if (data.size() > MAX_MESSAGE_BODY) {
@ -131,7 +127,7 @@ Response RequestHandler::process_store(const json& params) {
auto msg =
fmt::format("Message body exceeds maximum allowed length of {}\n",
MAX_MESSAGE_BODY);
return Response{Status::BAD_REQUEST, std::move(msg)};
return {http::BAD_REQUEST, std::move(msg)};
}
if (!service_node_.is_pubkey_for_us(pk)) {
@ -141,36 +137,33 @@ Response RequestHandler::process_store(const json& params) {
uint64_t ttlInt;
if (!util::parseTTL(ttl, ttlInt)) {
OXEN_LOG(debug, "Forbidden. Invalid TTL: {}", ttl);
return Response{Status::FORBIDDEN, "Provided TTL is not valid.\n"};
return {http::FORBIDDEN, "Provided TTL is not valid.\n"};
}
uint64_t timestampInt;
if (!util::parseTimestamp(timestamp, ttlInt, timestampInt)) {
OXEN_LOG(debug, "Forbidden. Invalid Timestamp: {}", timestamp);
return Response{Status::NOT_ACCEPTABLE,
"Timestamp error: check your clock\n"};
return {http::NOT_ACCEPTABLE, "Timestamp error: check your clock\n"};
}
auto messageHash = computeMessageHash(timestamp, ttl, pk.str(), data);
auto messageHash = computeMessageHash({timestamp, ttl, pk.str(), data}, true);
bool success;
try {
const auto msg =
message_t{pk.str(), data, messageHash, ttlInt, timestampInt};
success = service_node_.process_store(msg);
success = service_node_.process_store({pk.str(), data, messageHash, ttlInt, timestampInt});
} catch (const std::exception& e) {
OXEN_LOG(critical,
"Internal Server Error. Could not store message for {}",
obfuscate_pubkey(pk.str()));
return Response{Status::INTERNAL_SERVER_ERROR, e.what()};
return {http::INTERNAL_SERVER_ERROR, e.what()};
}
if (!success) {
OXEN_LOG(warn, "Service node is initializing");
return Response{Status::SERVICE_UNAVAILABLE,
"Service node is initializing\n"};
return {http::SERVICE_UNAVAILABLE,
"Service node is initializing\n"};
}
OXEN_LOG(trace, "Successfully stored message for {}",
@ -181,7 +174,7 @@ Response RequestHandler::process_store(const json& params) {
/// we send this to avoid breaking older clients.
res_body["difficulty"] = 1;
return Response{Status::OK, res_body.dump(), ContentType::json};
return {http::OK, res_body.dump(), http::json};
}
inline const static std::unordered_set<std::string> allowed_oxend_endpoints{{
@ -193,17 +186,17 @@ void RequestHandler::process_oxend_request(
std::string endpoint;
if (auto it = params.find("endpoint");
it == params.end() || !it->is_string())
return cb({Status::BAD_REQUEST, "missing 'endpoint'"});
return cb({http::BAD_REQUEST, "missing 'endpoint'"});
else
endpoint = it->get<std::string>();
if (!allowed_oxend_endpoints.count(endpoint))
return cb({Status::BAD_REQUEST, "Endpoint not allowed: " + endpoint});
return cb({http::BAD_REQUEST, "Endpoint not allowed: " + endpoint});
std::optional<std::string> oxend_params;
if (auto it = params.find("params"); it != params.end()) {
if (!it->is_object())
return cb({Status::BAD_REQUEST, "invalid oxend 'params' argument"});
return cb({http::BAD_REQUEST, "invalid oxend 'params' argument"});
oxend_params = it->dump();
}
@ -215,10 +208,10 @@ void RequestHandler::process_oxend_request(
// (which end in ".bin") at some point in the future then we'll need to return those
// endpoint results differently here.
if (success && data.size() >= 2 && data[0] == "200")
return cb({Status::OK,
return cb({http::OK,
R"({"result":)" + std::move(data[1]) + "}",
ContentType::json});
return cb({Status::BAD_REQUEST,
http::json});
return cb({http::BAD_REQUEST,
data.size() >= 2 && !data[1].empty()
? std::move(data[1]) : "Unknown oxend error"s});
},
@ -232,7 +225,7 @@ Response RequestHandler::process_retrieve_all() {
bool res = service_node_.get_all_messages(all_entries);
if (!res) {
return Response{Status::INTERNAL_SERVER_ERROR,
return Response{http::INTERNAL_SERVER_ERROR,
"could not retrieve all entries\n"};
}
@ -248,14 +241,15 @@ Response RequestHandler::process_retrieve_all() {
json res_body;
res_body["messages"] = messages;
return Response{Status::OK, res_body.dump(), ContentType::json};
return Response{http::OK, res_body.dump(), http::json};
}
Response RequestHandler::process_snodes_by_pk(const json& params) const {
if (!params.contains("pubKey")) {
auto it = params.find("pubKey");
if (it == params.end()) {
OXEN_LOG(debug, "Bad client request: no `pubKey` field");
return Response{Status::BAD_REQUEST,
return {http::BAD_REQUEST,
"invalid json: no `pubKey` field\n"};
}
@ -267,7 +261,7 @@ Response RequestHandler::process_snodes_by_pk(const json& params) const {
auto msg = fmt::format("Pubkey must be {} hex digits long\n",
get_user_pubkey_size());
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
return Response{http::BAD_REQUEST, std::move(msg)};
}
const std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pk);
@ -278,7 +272,7 @@ Response RequestHandler::process_snodes_by_pk(const json& params) const {
OXEN_LOG(debug, "Snodes by pk: {}", res_body.dump());
return Response{Status::OK, res_body.dump(), ContentType::json};
return Response{http::OK, res_body.dump(), http::json};
}
Response RequestHandler::process_retrieve(const json& params) {
@ -289,7 +283,7 @@ Response RequestHandler::process_retrieve(const json& params) {
if (!params.contains(field)) {
auto msg = fmt::format("invalid json: no `{}` field", field);
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
return Response{http::BAD_REQUEST, std::move(msg)};
}
}
@ -302,7 +296,7 @@ Response RequestHandler::process_retrieve(const json& params) {
auto msg = fmt::format("Pubkey must be {} characters long\n",
get_user_pubkey_size());
OXEN_LOG(debug, "{}", msg);
return Response{Status::BAD_REQUEST, std::move(msg)};
return Response{http::BAD_REQUEST, std::move(msg)};
}
if (!service_node_.is_pubkey_for_us(pk)) {
@ -323,7 +317,7 @@ Response RequestHandler::process_retrieve(const json& params) {
obfuscate_pubkey(pk.str()));
OXEN_LOG(critical, "{}", msg);
return Response{Status::INTERNAL_SERVER_ERROR, std::move(msg)};
return Response{http::INTERNAL_SERVER_ERROR, std::move(msg)};
}
if (!items.empty()) {
@ -345,7 +339,7 @@ Response RequestHandler::process_retrieve(const json& params) {
res_body["messages"] = messages;
return Response{Status::OK, res_body.dump(), ContentType::json};
return Response{http::OK, res_body.dump(), http::json};
}
void RequestHandler::process_client_req(
@ -356,7 +350,7 @@ void RequestHandler::process_client_req(
const json body = json::parse(req_json, nullptr, false);
if (body.is_discarded()) {
OXEN_LOG(debug, "Bad client request: invalid json");
return cb(Response{Status::BAD_REQUEST, "invalid json\n"});
return cb(Response{http::BAD_REQUEST, "invalid json\n"});
}
if (OXEN_LOG_ENABLED(trace))
@ -365,7 +359,7 @@ void RequestHandler::process_client_req(
const auto method_it = body.find("method");
if (method_it == body.end() || !method_it->is_string()) {
OXEN_LOG(debug, "Bad client request: no method field");
return cb(Response{Status::BAD_REQUEST, "invalid json: no `method` field\n"});
return cb(Response{http::BAD_REQUEST, "invalid json: no `method` field\n"});
}
const auto& method_name = method_it->get_ref<const std::string&>();
@ -375,7 +369,7 @@ void RequestHandler::process_client_req(
const auto params_it = body.find("params");
if (params_it == body.end() || !params_it->is_object()) {
OXEN_LOG(debug, "Bad client request: no params field");
return cb(Response{Status::BAD_REQUEST, "invalid json: no `params` field\n"});
return cb(Response{http::BAD_REQUEST, "invalid json: no `params` field\n"});
}
if (method_name == "store") {
@ -400,12 +394,62 @@ void RequestHandler::process_client_req(
if (method_name == "get_lns_mapping") {
const auto name_it = params_it->find("name_hash");
if (name_it == params_it->end())
return cb({Status::BAD_REQUEST, "Field <name_hash> is missing"});
return cb({http::BAD_REQUEST, "Field <name_hash> is missing"});
return process_lns_request(*name_it, std::move(cb));
}
OXEN_LOG(debug, "Bad client request: unknown method '{}'", method_name);
return cb({Status::BAD_REQUEST, "no method " + method_name});
return cb({http::BAD_REQUEST, "no method " + method_name});
}
legacy_pubkey parse_pubkey(std::string_view public_key_in) {
legacy_pubkey pk{};
if (public_key_in.size() == 64 && oxenmq::is_hex(public_key_in))
oxenmq::from_hex(public_key_in.begin(), public_key_in.end(), pk.begin());
else if ((public_key_in.size() == 43 || (public_key_in.size() == 44 && public_key_in.back() == '='))
&& oxenmq::is_base64(public_key_in))
oxenmq::from_base64(public_key_in.begin(), public_key_in.end(), pk.begin());
else if (public_key_in.size() == 52 && oxenmq::is_base32z(public_key_in))
oxenmq::from_base32z(public_key_in.begin(), public_key_in.end(), pk.begin());
else {
OXEN_LOG(warn, "Invalid public key header: not hex, b64, or b32z encoded");
OXEN_LOG(debug, "Received public key encoded value: {}", public_key_in);
}
return pk;
}
std::variant<legacy_pubkey, Response> RequestHandler::validate_snode_signature(const Request& r, bool headers_only) {
legacy_pubkey pubkey;
if (auto it = r.headers.find(http::SNODE_SENDER_HEADER); it != r.headers.end())
pubkey = parse_pubkey(it->second);
if (!pubkey) {
OXEN_LOG(debug, "Missing or invalid pubkey header for request");
return Response{http::BAD_REQUEST, "missing/invalid pubkey header"};
}
signature sig;
if (auto it = r.headers.find(http::SNODE_SIGNATURE_HEADER); it != r.headers.end()) {
try { sig = signature::from_base64(it->second); }
catch (...) {
OXEN_LOG(warn, "invalid signature (not b64) found in header from {}", pubkey);
return Response{http::BAD_REQUEST, "Invalid signature"};
}
} else {
OXEN_LOG(debug, "Missing required signature header for request");
return Response{http::BAD_REQUEST, "missing signature header"};
}
if (!service_node_.find_node(pubkey)) {
OXEN_LOG(debug, "Rejecting signature from unknown service node: {}", pubkey);
return Response{http::UNAUTHORIZED, "Unknown service node"};
}
if (!headers_only) {
if (!check_signature(sig, hash_data(r.body), pubkey)) {
OXEN_LOG(debug, "snode signature verification failed for pubkey {}", pubkey);
return Response{http::UNAUTHORIZED, "snode signature verification failed"};
}
}
return pubkey;
}
Response RequestHandler::wrap_proxy_response(Response res,
@ -414,20 +458,19 @@ Response RequestHandler::wrap_proxy_response(Response res,
bool embed_json,
bool base64) const {
auto status = static_cast<std::underlying_type_t<Status>>(res.status());
int status = res.status.first;
std::string body;
if (embed_json && res.content_type() == ContentType::json)
body = fmt::format(R"({{"status":{},"body":{}}})",
status, res.message());
if (embed_json && res.content_type == http::json)
body = fmt::format(R"({{"status":{},"body":{}}})", status, res.body);
else
body = json{{"status", status}, {"body", res.message()}}.dump();
body = json{{"status", status}, {"body", res.body}}.dump();
std::string ciphertext = channel_cipher_.encrypt(enc_type, body, client_key);
if (base64)
ciphertext = oxenmq::to_base64(std::move(ciphertext));
// why does this have to be json???
return Response{Status::OK, std::move(ciphertext), ContentType::json};
return Response{http::OK, std::move(ciphertext), http::json};
}
void RequestHandler::process_lns_request(
@ -449,21 +492,21 @@ void RequestHandler::process_lns_request(
#ifdef INTEGRATION_TEST
// use mainnet seed
oxend_json_rpc_request(
ioc_, "public.loki.foundation", 22023, "lns_names_to_owners", params,
service_node_.ioc(), "public.loki.foundation", 22023, "lns_names_to_owners", params,
[cb = std::move(cb)](sn_response_t sn) {
if (sn.error_code == SNodeError::NO_ERROR && sn.body)
cb({Status::OK, *sn.body});
cb({http::OK, *sn.body});
else
cb({Status::BAD_REQUEST, "unknown oxend error"});
cb({http::BAD_REQUEST, "unknown oxend error"});
});
#else
service_node_.omq_server().oxend_request(
"rpc.lns_names_to_owners",
[cb = std::move(cb)](bool success, auto&& data) {
if (success && !data.empty())
cb({Status::OK, data.front()});
cb({http::OK, data.front()});
else
cb({Status::BAD_REQUEST, "unknown oxend error"});
cb({http::BAD_REQUEST, "unknown oxend error"});
});
#endif
}
@ -475,7 +518,7 @@ void RequestHandler::process_onion_exit(
OXEN_LOG(debug, "Processing onion exit!");
if (!service_node_.snode_ready())
return cb({Status::SERVICE_UNAVAILABLE, "Snode not ready"});
return cb({http::SERVICE_UNAVAILABLE, "Snode not ready"});
this->process_client_req(body, std::move(cb));
}
@ -487,7 +530,7 @@ void RequestHandler::process_proxy_exit(
if (!service_node_.snode_ready())
return cb(wrap_proxy_response(
{Status::SERVICE_UNAVAILABLE, "Snode not ready"},
{http::SERVICE_UNAVAILABLE, "Snode not ready"},
client_key, EncryptType::aes_cbc));
static int proxy_idx = 0;
@ -506,41 +549,25 @@ void RequestHandler::process_proxy_exit(
// TODO: since we always seem to encrypt the response, we should
// do it once one level above instead
return cb(wrap_proxy_response({Status::BAD_REQUEST, std::move(msg)},
return cb(wrap_proxy_response({http::BAD_REQUEST, std::move(msg)},
client_key, EncryptType::aes_cbc));
}
std::string body;
bool lp_used = false;
try {
const json req = json::parse(plaintext, nullptr, true);
body = req.at("body").get<std::string>();
if (req.find("headers") != req.end()) {
if (req.at("headers").find(OXEN_LONG_POLL_HEADER) !=
req.at("headers").end()) {
lp_used =
req.at("headers").at(OXEN_LONG_POLL_HEADER).get<bool>();
}
}
} catch (const std::exception& e) {
auto msg = fmt::format("JSON parsing error: {}", e.what());
OXEN_LOG(debug, "[{}] {}", idx, msg);
return cb(wrap_proxy_response(
{Status::BAD_REQUEST, std::move(msg)}, client_key, EncryptType::aes_cbc));
}
if (lp_used) {
OXEN_LOG(debug, "Long polling requested over a proxy request");
{http::BAD_REQUEST, std::move(msg)}, client_key, EncryptType::aes_cbc));
}
this->process_client_req(
body, [this, cb = std::move(cb), client_key, idx](oxen::Response res) {
OXEN_LOG(debug, "[{}] proxy about to respond with: {}", idx,
res.status());
OXEN_LOG(debug, "[{}] proxy about to respond with: {}", idx, res.status.first);
cb(wrap_proxy_response(std::move(res), client_key, EncryptType::aes_cbc));
});
@ -555,8 +582,8 @@ void RequestHandler::process_onion_to_url(
auto req = std::make_shared<request_t>();
req->body() = payload;
req->set(http::field::host, host);
req->method(http::verb::post);
req->set(bhttp::field::host, host);
req->method(bhttp::verb::post);
req->target(target);
req->prepare_payload();
@ -564,17 +591,17 @@ void RequestHandler::process_onion_to_url(
// `cb` needs to be adapted for http request
auto http_cb = [cb = std::move(cb)](sn_response_t res) {
if (res.error_code == SNodeError::NO_ERROR) {
cb(oxen::Response{Status::OK, *res.body});
cb(oxen::Response{http::OK, *res.body});
} else {
OXEN_LOG(debug, "Oxen server error: {}", res.error_code);
cb(oxen::Response{Status::BAD_REQUEST, "Oxen Server error"});
cb(oxen::Response{http::BAD_REQUEST, "Oxen Server error"});
}
};
if (protocol != "https") {
make_http_request(ioc_, host, port, req, http_cb);
make_http_request(service_node_.ioc(), host, port, req, http_cb);
} else {
make_https_request(ioc_, host, port, req, http_cb);
make_https_request(service_node_.ioc(), host, port, req, http_cb);
}
}

View file

@ -1,9 +1,11 @@
#pragma once
#include "channel_encryption.hpp"
#include "http.h"
#include "onion_processing.h"
#include "oxen_common.h"
#include "oxend_key.h"
#include "string_utils.hpp"
#include <string>
#include <string_view>
@ -13,26 +15,10 @@
namespace oxen {
constexpr size_t MAX_MESSAGE_BODY = 102400; // 100 KB limit
class ServiceNode;
enum class Status {
OK = 200,
BAD_REQUEST = 400,
FORBIDDEN = 403,
NOT_ACCEPTABLE = 406,
MISDIRECTED_REQUEST = 421,
INVALID_POW = 432, // unassigned http code
SERVICE_UNAVAILABLE = 503,
INTERNAL_SERVER_ERROR = 500,
BAD_GATEWAY = 502,
GATEWAY_TIMEOUT = 504,
};
enum class ContentType {
plaintext,
json,
};
namespace ss_client {
enum class ReqMethod {
@ -41,60 +27,50 @@ enum class ReqMethod {
ONION_REQUEST,
};
class Request {
public:
std::string body;
// Might change this to a vector later
std::map<std::string, std::string> headers;
};
}; // namespace ss_client
class Response {
Status status_;
std::string message_;
ContentType content_type_;
public:
Response(Status s, std::string m, ContentType ct = ContentType::plaintext)
: status_(s), message_(std::move(m)), content_type_(ct) {}
const std::string& message() const & { return message_; }
std::string&& message() && { return std::move(message_); }
Status status() const { return status_; }
ContentType content_type() const { return content_type_; }
// Simpler wrappers that work for most of our requests
struct Request {
std::string body;
http::headers headers;
std::string remote_addr;
std::string uri;
};
struct Response {
http::response_code status = http::OK;
std::string body;
std::string_view content_type = http::plaintext;
std::vector<std::pair<std::string, std::string>> headers;
};
std::string to_string(const Response& res);
/// Compute message's hash based on its constituents.
std::string computeMessageHash(const std::string& timestamp,
const std::string& ttl,
const std::string& recipient,
const std::string& data);
/// Parse a pubkey string value as either base32z (deprecated!), b64, or hex. Returns a null pk
/// (i.e. operator bool() returns false) and warns on invalid input.
legacy_pubkey parse_pubkey(std::string_view public_key_in);
struct OnionRequestMetadata {
x25519_pubkey ephem_key;
std::function<void(oxen::Response)> cb;
std::function<void(Response)> cb;
int hop_no = 0;
EncryptType enc_type = EncryptType::aes_gcm;
};
class RequestHandler {
boost::asio::io_context& ioc_;
ServiceNode& service_node_;
const ChannelEncryption& channel_cipher_;
// Wrap response `res` to an intermediate node
Response wrap_proxy_response(Response res,
const x25519_pubkey& client_key,
EncryptType enc_type,
bool json = false,
bool base64 = true) const;
Response wrap_proxy_response(
Response res,
const x25519_pubkey& client_key,
EncryptType enc_type,
bool json = false,
bool base64 = true) const;
// Return the correct swarm for `pubKey`
Response handle_wrong_swarm(const user_pubkey_t& pubKey);
@ -112,20 +88,33 @@ class RequestHandler {
Response process_retrieve(const nlohmann::json& params);
void process_onion_exit(std::string_view payload,
std::function<void(oxen::Response)> cb);
std::function<void(Response)> cb);
void process_lns_request(std::string name_hash,
std::function<void(oxen::Response)> cb);
std::function<void(Response)> cb);
// ===================================
public:
RequestHandler(boost::asio::io_context& ioc, ServiceNode& sn,
const ChannelEncryption& ce);
RequestHandler(ServiceNode& sn, const ChannelEncryption& ce);
// Process all Session client requests
void process_client_req(std::string_view req_json,
std::function<void(oxen::Response)> cb);
std::function<void(Response)> cb);
/// Verifies snode pubkey and signature values in a request; returns the sender pubkey on
/// success or a filled-out error Response if verification fails.
///
/// `prevalidate` - if true, do a "pre-validation": check that the required header values
/// (pubkey, signature) are present and valid (including verifying that the pubkey is a valid
/// snode) but don't actually verify the signature against the body (note that this is *not*
/// signature verification but is used as a pre-check before reading a body to ensure the
/// required headers are present).
std::variant<legacy_pubkey, Response> validate_snode_signature(
const Request& r, bool headers_only = false);
// Processes a swarm test request
Response process_storage_test_req(Request r);
// Forwards a request to oxend RPC. `params` should contain:
// - endpoint -- the name of the rpc endpoint; currently allowed are `ons_resolve` and
@ -137,7 +126,7 @@ class RequestHandler {
// Returns (via the response callback) the oxend JSON object on success; on failure returns
// a failure response with a body of the error string.
void process_oxend_request(const nlohmann::json& params,
std::function<void(oxen::Response)> cb);
std::function<void(Response)> cb);
// Test only: retrieve all db entires
Response process_retrieve_all();
@ -146,13 +135,13 @@ class RequestHandler {
void process_proxy_exit(
const x25519_pubkey& client_key,
std::string_view payload,
std::function<void(oxen::Response)> cb);
std::function<void(Response)> cb);
void process_onion_to_url(const std::string& protocol,
const std::string& host, const uint16_t port,
const std::string& target,
const std::string& payload,
std::function<void(oxen::Response)> cb);
std::function<void(Response)> cb);
// The result will arrive asynchronously, so it needs a callback handler
void process_onion_req(std::string_view ciphertext, OnionRequestMetadata data);

View file

@ -1,28 +0,0 @@
#include "security.h"
#include "oxend_key.h"
#include "signature.h"
#include <oxenmq/base64.h>
#include <filesystem>
#include <fstream>
namespace oxen {
Security::Security(legacy_keypair key_pair,
std::filesystem::path base_path)
: key_pair_{std::move(key_pair)}, base_path_{std::move(base_path)} {}
void Security::generate_cert_signature() {
std::ifstream file{base_path_ / "cert.pem"};
if (!file.is_open()) {
throw std::runtime_error("Could not find cert.pem");
}
std::string cert_pem((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
const auto hash = hash_data(cert_pem);
const auto sig = generate_signature(hash, key_pair_);
std::string_view raw_sig{reinterpret_cast<const char*>(&sig), sizeof(sig)};
cert_signature_ = oxenmq::to_base64(raw_sig);
}
} // namespace oxen

View file

@ -1,25 +0,0 @@
#pragma once
#include <filesystem>
#include <string>
#include "oxend_key.h"
namespace oxen {
class Security {
public:
Security(legacy_keypair key_pair,
std::filesystem::path base_path);
void generate_cert_signature();
const std::string& get_cert_signature() const {
return cert_signature_;
}
private:
legacy_keypair key_pair_;
std::string cert_signature_;
std::filesystem::path base_path_;
};
} // namespace oxen

View file

@ -1,7 +1,5 @@
#include "server_certificates.h"
#include <boost/asio/buffer.hpp>
extern "C" {
#include <openssl/conf.h>
#include <openssl/crypto.h>
@ -15,7 +13,6 @@ extern "C" {
#include "oxen_logger.h"
#include <cstddef>
#include <fstream>
namespace oxen {
@ -204,38 +201,4 @@ err:
BIO_free(bio_err);
}
void load_server_certificate(const std::filesystem::path& base_path,
boost::asio::ssl::context& ctx) {
/*
The certificate was generated from CMD.EXE on Windows 10 using:
winpty openssl dhparam -out dh.pem 2048
winpty openssl req -newkey rsa:2048 -nodes -keyout key.pem -x509 -days
10000 -out cert.pem -subj "//C=US\ST=CA\L=Los
Angeles\O=Beast\CN=www.example.com"
*/
const auto cert_path = base_path / "cert.pem";
const auto key_path = base_path / "key.pem";
const auto dh_path = base_path / "dh.pem";
if (!std::filesystem::exists(cert_path) ||
!std::filesystem::exists(key_path)) {
generate_cert(cert_path, key_path);
}
if (!std::filesystem::exists(dh_path)) {
generate_dh_pem(dh_path);
}
ctx.set_options(boost::asio::ssl::context::default_workarounds |
boost::asio::ssl::context::no_sslv2 |
boost::asio::ssl::context::single_dh_use);
ctx.use_certificate_chain_file(cert_path.u8string());
ctx.use_private_key_file(key_path.u8string(),
boost::asio::ssl::context::file_format::pem);
ctx.use_tmp_dh_file(dh_path.u8string());
}
}

View file

@ -1,12 +1,10 @@
#pragma once
#include <filesystem>
#include <boost/asio/ssl/context.hpp>
namespace oxen {
void generate_dh_pem(const std::filesystem::path& dh_path);
void generate_cert(const std::filesystem::path& cert_path, const std::filesystem::path& key_path);
void load_server_certificate(const std::filesystem::path& base_path,
boost::asio::ssl::context& ctx);
}

View file

@ -2,15 +2,16 @@
#include "Database.hpp"
#include "Item.hpp"
#include "http.h"
#include "http_connection.h"
#include "https_client.h"
#include "lmq_server.h"
#include "net_stats.h"
#include "oxen_common.h"
#include "oxen_logger.h"
#include "oxend_key.h"
#include "serialization.h"
#include "signature.h"
#include "string_utils.hpp"
#include "utils.hpp"
#include "version.h"
#include <nlohmann/json.hpp>
@ -23,7 +24,6 @@
#include <algorithm>
#include <chrono>
#include <fstream>
#include <string_view>
#include <boost/bind/bind.hpp>
@ -56,18 +56,16 @@ constexpr std::chrono::seconds OXEND_PING_INTERVAL = 30s;
constexpr int CLIENT_RETRIEVE_MESSAGE_LIMIT = 100;
ServiceNode::ServiceNode(
boost::asio::io_context& ioc,
sn_record_t address,
const legacy_seckey& skey,
OxenmqServer& lmq_server,
const std::filesystem::path& db_location,
const bool force_start)
: ioc_(ioc),
db_(std::make_unique<Database>(db_location)),
const bool force_start) :
force_start_{force_start},
db_{std::make_unique<Database>(db_location)},
our_address_{std::move(address)},
our_seckey_{skey},
lmq_server_(lmq_server),
force_start_(force_start) {
lmq_server_{lmq_server} {
swarm_ = std::make_unique<Swarm>(our_address_);
@ -83,6 +81,8 @@ ServiceNode::ServiceNode(
lmq_server_->add_timer([this] { std::lock_guard l{sn_mutex_}; all_stats_.cleanup(); },
STATS_CLEANUP_INTERVAL);
ioc_.run();
// We really want to make sure nodes don't get stuck in "syncing" mode,
// so if we are still "syncing" after a long time, activate SN regardless
auto delay_timer = std::make_shared<boost::asio::steady_timer>(ioc_);
@ -283,11 +283,21 @@ void ServiceNode::bootstrap_data() {
}
}
void ServiceNode::shutdown() {
shutting_down_ = true;
ioc_.stop();
}
bool ServiceNode::snode_ready(std::string* reason) {
if (shutting_down()) {
if (reason) *reason = "shutting down";
return false;
}
std::lock_guard guard(sn_mutex_);
std::vector<std::string> problems;
if (!hf_at_least(STORAGE_SERVER_HARDFORK))
problems.push_back("not yet on hardfork " + std::to_string(STORAGE_SERVER_HARDFORK));
if (!swarm_ || !swarm_->is_valid())
@ -325,8 +335,7 @@ void ServiceNode::send_onion_to_sn(const sn_record_t& sn,
// Calls callback on success only?
void ServiceNode::send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
ss_client::Request req,
ss_client::Callback cb) const {
Request req, ss_client::Callback cb) const {
std::lock_guard guard(sn_mutex_);
@ -339,7 +348,7 @@ void ServiceNode::send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
break;
}
case ss_client::ReqMethod::PROXY_EXIT: {
auto client_key = req.headers.find(OXEN_SENDER_KEY_HEADER);
auto client_key = req.headers.find(http::SENDER_KEY_HEADER);
// I could just always assume that we are passing the right
// parameters...
@ -350,7 +359,7 @@ void ServiceNode::send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
std::move(cb), client_key->second, req.body);
} else {
OXEN_LOG(debug, "Developer error: no {} passed in headers",
OXEN_SENDER_KEY_HEADER);
http::SENDER_KEY_HEADER);
// TODO: call cb?
assert(false);
}
@ -369,18 +378,12 @@ void ServiceNode::send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
void ServiceNode::relay_data_reliable(const std::string& blob,
const sn_record_t& sn) const {
auto reply_callback = [](bool success, std::vector<std::string> data) {
if (!success) {
OXEN_LOG(error, "Failed to send batch data: time-out");
}
};
OXEN_LOG(debug, "Relaying data to: {}", sn.pubkey_legacy);
auto req = ss_client::Request{blob, {}};
this->send_to_sn(sn, ss_client::ReqMethod::DATA, std::move(req),
reply_callback);
send_to_sn(sn, ss_client::ReqMethod::DATA, Request{blob},
[](bool success, auto&& data) {
if (!success) OXEN_LOG(error, "Failed to send batch data: time-out");
});
}
void ServiceNode::record_proxy_request() { all_stats_.bump_proxy_requests(); }
@ -833,9 +836,9 @@ void ServiceNode::attach_signature(request_t& request,
raw_sig.insert(raw_sig.end(), sig.r.begin(), sig.r.end());
const std::string sig_b64 = oxenmq::to_base64(raw_sig);
request.set(OXEN_SNODE_SIGNATURE_HEADER, sig_b64);
request.set(http::SNODE_SIGNATURE_HEADER, sig_b64);
request.set(OXEN_SENDER_SNODE_PUBKEY_HEADER,
request.set(http::SNODE_SENDER_HEADER,
oxenmq::to_base32z(our_address_.pubkey_legacy.view()));
}
@ -1049,9 +1052,10 @@ bool ServiceNode::derive_tester_testee(uint64_t blk_height, sn_record_t& tester,
return true;
}
MessageTestStatus ServiceNode::process_storage_test_req(
uint64_t blk_height, const legacy_pubkey& tester_pk,
const std::string& msg_hash, std::string& answer) {
std::pair<MessageTestStatus, std::string> ServiceNode::process_storage_test_req(
uint64_t blk_height,
const legacy_pubkey& tester_pk,
const std::string& msg_hash) {
std::lock_guard guard(sn_mutex_);
@ -1061,7 +1065,7 @@ MessageTestStatus ServiceNode::process_storage_test_req(
if (blk_height > block_height_) {
OXEN_LOG(debug, "Our blockchain is behind, height: {}, requested: {}",
block_height_, blk_height);
return MessageTestStatus::RETRY;
return {MessageTestStatus::RETRY, ""};
}
// 2. Check tester/testee pair
@ -1072,7 +1076,7 @@ MessageTestStatus ServiceNode::process_storage_test_req(
if (testee != our_address_) {
OXEN_LOG(error, "We are NOT the testee for height: {}", blk_height);
return MessageTestStatus::WRONG_REQ;
return {MessageTestStatus::WRONG_REQ, ""};
}
if (tester.pubkey_legacy != tester_pk) {
@ -1082,7 +1086,7 @@ MessageTestStatus ServiceNode::process_storage_test_req(
OXEN_LOG(critical, "ABORT in integration test");
std::abort();
#endif
return MessageTestStatus::WRONG_REQ;
return {MessageTestStatus::WRONG_REQ, ""};
} else {
OXEN_LOG(trace, "Tester is valid: {}", tester_pk);
}
@ -1091,38 +1095,38 @@ MessageTestStatus ServiceNode::process_storage_test_req(
// 3. If for a current/past block, try to respond right away
Item item;
if (!db_->retrieve_by_hash(msg_hash, item)) {
return MessageTestStatus::RETRY;
return {MessageTestStatus::RETRY, ""};
}
answer = item.data;
return MessageTestStatus::SUCCESS;
return {MessageTestStatus::SUCCESS, std::move(item.data)};
}
bool ServiceNode::select_random_message(Item& item) {
std::optional<Item> ServiceNode::select_random_message() {
uint64_t message_count;
if (!db_->get_message_count(message_count)) {
OXEN_LOG(error, "Could not count messages in the database");
return false;
return {};
}
OXEN_LOG(debug, "total messages: {}", message_count);
if (message_count == 0) {
OXEN_LOG(debug, "No messages in the database to initiate a peer test");
return false;
return {};
}
// SNodes don't have to agree on this, rather they should use different
// messages
const auto msg_idx = util::uniform_distribution_portable(message_count);
if (!db_->retrieve_by_index(msg_idx, item)) {
auto item = std::make_optional<Item>();
if (!db_->retrieve_by_index(msg_idx, *item)) {
OXEN_LOG(error, "Could not retrieve message by index: {}", msg_idx);
return false;
return {};
}
return true;
return item;
}
void ServiceNode::initiate_peer_test() {
@ -1158,19 +1162,12 @@ void ServiceNode::initiate_peer_test() {
return;
}
/// 2. Storage Testing
{
// 2.1. Select a message
Item item;
if (!this->select_random_message(item)) {
OXEN_LOG(debug, "Could not select a message for testing");
} else {
OXEN_LOG(trace, "Selected random message: {}, {}", item.hash,
item.data);
// 2.2. Initiate testing request
this->send_storage_test_req(testee, test_height, item);
}
/// 2. Storage Testing: initiate a testing request with a randomly selected message
if (auto item = select_random_message()) {
OXEN_LOG(trace, "Selected random message: {}, {}", item->hash, item->data);
send_storage_test_req(testee, test_height, *item);
} else {
OXEN_LOG(debug, "Could not select a message for testing");
}
}
@ -1364,9 +1361,11 @@ std::string ServiceNode::get_stats() const {
val["total_stored"] = total_stored;
}
val["connections_in"] = get_net_stats().connections_in.load();
val["http_connections_out"] = get_net_stats().http_connections_out.load();
val["https_connections_out"] = get_net_stats().https_connections_out.load();
// TODO: figure out some more interesting stats (these counters don't tell us much at all except
// for, maybe, a slow loris attack in progress, and so were removed)
val["connections_in"] = -1;
val["http_connections_out"] = -1;
val["https_connections_out"] = -1;
/// we want pretty (indented) json, but might change that in the future
constexpr bool PRETTY = true;
@ -1405,9 +1404,10 @@ std::string ServiceNode::get_status_line() const {
s << "; " << total_stored << " msgs";
s << "; reqs(S/R): " << all_stats_.get_total_store_requests() << '/'
<< all_stats_.get_total_retrieve_requests();
s << "; conns(in/http/https): " << get_net_stats().connections_in << '/'
// FIXME: something better?
/*s << "; conns(in/http/https): " << get_net_stats().connections_in << '/'
<< get_net_stats().http_connections_out << '/'
<< get_net_stats().https_connections_out;
<< get_net_stats().https_connections_out;*/
return s.str();
}

View file

@ -2,12 +2,8 @@
#include <Database.hpp>
#include <chrono>
#include <fstream>
#include <iostream>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/beast/http.hpp>
@ -19,8 +15,8 @@
#include "stats.h"
#include "swarm.h"
namespace http = boost::beast::http;
using request_t = http::request<http::string_body>;
namespace bhttp = boost::beast::http;
using request_t = bhttp::request<bhttp::string_body>;
namespace oxen {
@ -44,21 +40,16 @@ class OxenmqServer;
struct OnionRequestMetadata;
namespace ss_client {
class Request;
namespace ss_client {
enum class ReqMethod;
using Callback = std::function<void(bool success, std::vector<std::string>)>;
} // namespace ss_client
namespace http_server {
class connection_t;
}
struct oxend_key_pair_t;
using connection_ptr = std::shared_ptr<http_server::connection_t>;
class Swarm;
struct signature;
@ -70,13 +61,13 @@ enum class SnodeStatus { UNKNOWN, UNSTAKED, DECOMMISSIONED, ACTIVE };
/// All service node logic that is not network-specific
class ServiceNode {
using listeners_t = std::vector<connection_ptr>;
boost::asio::io_context& ioc_;
boost::asio::io_context ioc_{1};
bool syncing_ = true;
bool active_ = false;
bool got_first_response_ = false;
bool force_start_ = false;
std::atomic<bool> shutting_down_ = false;
int hardfork_ = 0;
uint64_t block_height_ = 0;
uint64_t target_height_ = 0;
@ -98,8 +89,6 @@ class ServiceNode {
// causing a deadlock
OxenmqServer& lmq_server_;
bool force_start_ = false;
std::atomic<int> oxend_pings_ = 0; // Consecutive successful pings, used for batching logs about it
// Will be set to true while we have an outstanding update_swarms() call so that we squelch
@ -175,8 +164,8 @@ class ServiceNode {
/// Check if it is our turn to test and initiate peer test if so
void initiate_peer_test();
// Select a random message from our database, return false on error
bool select_random_message(storage::Item& item); // mutex not needed
/// Select a random message from our database, return nullopt on error
std::optional<storage::Item> select_random_message(); // mutex not needed
// Initiate node ping tests
void test_reachability(const sn_record_t& sn, int previous_failures);
@ -187,8 +176,7 @@ class ServiceNode {
void sign_request(request_t& req) const;
public:
ServiceNode(boost::asio::io_context& ioc,
sn_record_t address,
ServiceNode(sn_record_t address,
const legacy_seckey& skey,
OxenmqServer& omq_server,
const std::filesystem::path& db_location,
@ -213,13 +201,29 @@ class ServiceNode {
// TODO: move this eventually out of SN
// Send by either http or omq
void send_to_sn(const sn_record_t& sn, ss_client::ReqMethod method,
ss_client::Request req, ss_client::Callback cb) const;
Request req, ss_client::Callback cb) const;
bool hf_at_least(int hardfork) const { return hardfork_ >= hardfork; }
// Return true if the service node is ready to start running
// Return true if the service node is ready to handle requests, which means the storage server
// is fully initialized (and not trying to shut down), the service node is active and assigned
// to a swarm and is not syncing.
//
// Teturns false and (if `reason` is non-nullptr) sets a reason string during initialization and
// while shutting down.
//
// If this ServiceNode was created with force_start enabled then this function always returns
// true (except when shutting down); the reason string is still set (when non-null) when errors
// would have occured without force_start.
bool snode_ready(std::string* reason = nullptr);
// Puts the storage server into shutdown mode; this operation is irreversible and should only be
// used during storage server shutdown.
void shutdown();
// Returns true if the storage server is currently shutting down.
bool shutting_down() const { return shutting_down_; }
/// Process message received from a client, return false if not in a swarm
bool process_store(const message_t& msg);
@ -227,10 +231,9 @@ class ServiceNode {
void process_push_batch(const std::string& blob);
// Attempt to find an answer (message body) to the storage test
MessageTestStatus process_storage_test_req(uint64_t blk_height,
std::pair<MessageTestStatus, std::string> process_storage_test_req(uint64_t blk_height,
const legacy_pubkey& tester_addr,
const std::string& msg_hash,
std::string& answer);
const std::string& msg_hash);
bool is_pubkey_for_us(const user_pubkey_t& pk) const;
@ -266,6 +269,8 @@ class ServiceNode {
void update_swarms();
OxenmqServer& omq_server() { return lmq_server_; }
boost::asio::io_context& ioc() { return ioc_; }
};
} // namespace oxen

View file

@ -1,5 +1,7 @@
add_library(utils STATIC
src/utils.cpp
src/string_utils.cpp
src/file.cpp
)
target_include_directories(utils PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)

View file

@ -34,35 +34,6 @@ uint64_t uniform_distribution_portable(std::mt19937_64& mersenne_twister,
/// Return the open file limit (-1 on failure)
int get_fd_limit();
inline bool ends_with(std::string_view str, std::string_view suffix) {
return str.size() >= suffix.size() &&
str.substr(str.size() - suffix.size()) == suffix;
}
inline bool starts_with(std::string_view str, std::string_view prefix) {
return str.substr(0, prefix.size()) == prefix;
}
/// Joins [begin, end) with a delimiter and returns the resulting string. Elements can be anything
/// that can be sent to an ostream via `<<`. The OSS template here is mainly to trick the compiler
/// (especially macos clang) into being happy with this include even when std::ostringstream isn't
/// yet available (and to put the include responsibility on the caller).
template <typename It, typename OSS = std::ostringstream>
std::string join(std::string_view delimiter, It begin, It end) {
OSS o;
if (begin != end)
o << *begin++;
while (begin != end)
o << delimiter << *begin++;
return o.str();
}
/// Wrapper around the above that takes a container and passes c.begin(), c.end() to the above.
template <typename Container>
std::string join(std::string_view delimiter, const Container& c) {
return join(delimiter, c.begin(), c.end());
}
std::optional<std::filesystem::path> get_home_dir();
} // namespace util

View file

@ -21,7 +21,7 @@ if(SPDLOG_FOUND)
else()
add_subdirectory(spdlog)
endif()
if(NOT TARGET sodium)
# Allow -D DOWNLOAD_SODIUM=FORCE to download without even checking for a local libsodium
@ -49,7 +49,7 @@ endif()
option(FORCE_OXENMQ_SUBMODULE "force using oxenmq submodule" OFF)
if(NOT STATIC AND NOT FORCE_OXENMQ_SUBMODULE)
pkg_check_modules(OXENMQ liboxenmq>=1.2.5 IMPORTED_TARGET)
pkg_check_modules(OXENMQ liboxenmq>=1.2.6 IMPORTED_TARGET)
endif()
if(OXENMQ_FOUND)
add_library(oxenmq INTERFACE)
@ -72,3 +72,41 @@ add_library(oxen-crypto-ops STATIC
target_include_directories(oxen-crypto-ops PUBLIC .)
# uSockets doesn't really have a proper build system (just a very simple Makefile) so build it
# ourselves.
if (NOT CMAKE_VERSION VERSION_LESS 3.12)
set(conf_depends "CONFIGURE_DEPENDS")
else()
set(conf_depends "")
endif()
file(GLOB usockets_src ${conf_depends}
uWebSockets/uSockets/src/*.c
uWebSockets/uSockets/src/eventing/*.c
uWebSockets/uSockets/src/crypto/*.c
uWebSockets/uSockets/src/crypto/*.cpp)
file(COPY uWebSockets/uSockets/src/libusockets.h DESTINATION uWebSockets)
add_library(uSockets STATIC EXCLUDE_FROM_ALL ${usockets_src})
target_include_directories(uSockets PRIVATE uWebSockets/uSockets/src)
target_compile_definitions(uSockets PRIVATE LIBUS_USE_OPENSSL)
target_compile_features(uSockets PRIVATE c_std_11 cxx_std_17)
target_link_libraries(uSockets OpenSSL::SSL OpenSSL::Crypto)
# On Windows uSockets uses libuv for its event loop; on Mac kqueue is the default, but that seems to
# not be reliable on older macos versions (like 10.12), so we use libuv on macos as well.
if (WIN32 OR (APPLE AND NOT IOS))
add_subdirectory(libuv EXCLUDE_FROM_ALL)
target_link_libraries(uSockets uv_a)
target_compile_definitions(uSockets PUBLIC LIBUS_USE_LIBUV)
endif()
# The uWebSockets C++ layer is header-only but isn't actually prefixed in the repository itself, but
# rather only on install (which, as above, is just a very simple Makefile). This is unfortunate
# because it means that we can't use `#include <uWebSockets/App.h>` directly with the repo; so
# instead we emulate the installation process into the build directory and include it (with the
# prefix) from there.
file(COPY uWebSockets/src/ DESTINATION uWebSockets/uWebSockets FILES_MATCHING PATTERN "*.h" PATTERN "*.hpp")
add_library(uWebSockets INTERFACE)
target_include_directories(uWebSockets SYSTEM INTERFACE ${CMAKE_CURRENT_BINARY_DIR}/uWebSockets)
target_link_libraries(uWebSockets INTERFACE uSockets)
target_compile_definitions(uWebSockets INTERFACE UWS_HTTPRESPONSE_NO_WRITEMARK UWS_NO_ZLIB)

2
vendors/loki-mq vendored

@ -1 +1 @@
Subproject commit 51754037ea19204610751c2ea8ae72b7ed6c1818
Subproject commit cdc6a9709c3299b1529c8479804b37491f083551