Replace epee http rpc server with uWebSockets

This replaces the NIH epee http server which does not work all that well
with an external C++ library called uWebSockets.  Fundamentally this
gives the following advantages:

- Much less code to maintain
- Just one thread for handling HTTP connections versus epee's pool of
threads
- Uses existing LokiMQ job server and existing thread pool for handling
the actual tasks; they are processed/scheduled in the same "rpc" or
"admin" queues as lokimq rpc calls.  One notable benefit is that "admin"
rpc commands get their own queue (and thus cannot be delayed by long rpc
commands).  Currently the lokimq threads and the http rpc thread pool
and the p2p thread pool and the job queue thread pool and the dns lookup
thread pool and... are *all* different thread pools; this is a step
towards consolidating them.
- Very little mutex contention (which has been a major problem with epee
RPC in the past): there is one mutex (inside uWebSockets) for putting
responses back into the thread managing the connection; everything
internally gets handled through (lock-free) lokimq inproc sockets.
- Faster RPC performance on average, and much better worst case
performance.  Epee's http interface seems to have some race condition
that ocassionally stalls a request (even a very simple one) for a dozen
or more seconds for no good reason.
- Long polling gets redone here to no longer need threads; instead we
just store the request and respond when the thread pool, or else in a
timer (that runs once/second) for timing out long polls.

---

The basic idea of how this works from a high level:

We launch a single thread to handle HTTP RPC requests and response data.
This uWebSockets thread is essentially running an event loop: it never
actually handles any logic; it only serves to shuttle data that arrives
in a request to some other thread, and then, at some later point, to
send some reply back to that waiting connection.  Everything is
asynchronous and non-blocking here: the basic uWebSockets event loop
just operates as things arrive, passes it off immediately, and goes back
to waiting for the next thing to arrive.

The basic flow is like this:

    0. uWS thread -- listens on localhost:22023
    1. uWS thread -- incoming request on localhost:22023
    2. uWS thread -- fires callback, which injects the task into the LokiMQ job queue
    3. LMQ main loop -- schedules it as an RPC job
    4. LMQ rpc thread -- Some LokiMQ thread runs it, gets the result
    5. LMQ rpc thread -- Result gets queued up for the uWS thread
    6. uWS thread -- takes the request and starts sending it
       (asynchronously) back to the requestor.

In more detail:

uWebSockets has registered has registered handlers for non-jsonrpc
requests (legacy JSON or binary).  If the port is restricted then admin
commands get mapped to a "Access denied" response handler, otherwise
public commands (and admin commands on an unrestricted port) go to the
rpc command handler.

POST requests to /json_rpc have their own handler; this is a little
different than the above because it has to parse the request before it
can determine whether it is allowed or not, but once this is done it
continues roughly the same as legacy/binary requests.

uWebSockets then listens on the given IP/port for new incoming requests,
and starts listening for requests in a thread (we own this thread).
When a request arrives, it fires the event handler for that request.
(This may happen multiple times, if the client is sending a bunch of
data in a POST request).  Once we have the full request, we then queue
the job in LokiMQ, putting it in the "rpc" or "admin" command
categories.  (The one practical different here is that "admin" is
configured to be allowed to start up its own thread if all other threads
are busy, while "rpc" commands are prioritized along with everything
else.)  LokiMQ then schedules this, along with native LokiMQ "rpc." or
"admin." requests.

When a LMQ worker thread becomes available, the RPC command gets called
in it and runs.  Whatever output it produces (or error message, if it
throws) then gets wrapped up in jsonrpc boilerplate (if necessary), and
delivered to the uWebSockets thread to be sent in reply to that request.

uWebSockets picks up the data and sends whatever it can without
blocking, then buffers whatever it couldn't send to be sent again in a
later event loop iteration once the requestor can accept more data.
(This part is outside lokid; we only have to give uWS the data and let
it worry about delivery).

---

PR specifics:

Things removed from this PR:

1. ssl settings; with this PR the HTTP RPC interface is plain-text.  The
previous default generated a self-signed certificate for the server on
startup and then the client accepted any certificate.  This is actually
*worse* than unencrypted because it is entirely MITM-readable and yet
might make people think that their RPC communication is encrypted, and
setting up actual certificates is difficult enough that I think most
people don't bother.

uWebSockets *does* support HTTPS, and we could glue the existing options
into it, but I'm not convinced it's worthwhile: it works much better to
put HTTPS in a front-end proxy holding the certificate that proxies
requests to the backend (which can then listen in restricted mode on
some localhost port).  One reason this is better is that it is much
easier to reload and/or restart such a front-end server, while
certificate updates with lokid require a full restart.  Another reason
is that you get an error page instead of a timeout if something is wrong
with the backend.  Finally we also save having to generate a temporary
certificate on *every* lokid invocation.

2. HTTP Digest authentication.  Digest authentication is obsolete (and
was already obsolete when it got added to Monero).  HTTP-Digest was
originally an attempt to provide a password authentication mechanism
that does not leak the password in transit, but still required that the
server know the password.  It only has marginal value against replay
attacks, and is made entirely obsolete by sending traffic over HTTPS
instead.  No client out there supports Digest but *not* Basic auth, and
so given the limited usefulness it seems pointless to support more than
Basic auth for HTTP RPC login.

What's worse is that epee's HTTP Digest authentication is a terrible
implementation: it uses boost::spirit -- a recursive descent parser
meant for building complex language grammars -- just to parse a single
HTTP header for Digest auth.  This is a big load of crap that should
never have been accepted upstream, and that we should get rid of (even
if we wanted to support Digest auth it takes less than 100 lines of code
to do it when *not* using a recursive descent parser).
This commit is contained in:
Jason Rhinelander 2020-06-28 20:23:06 -03:00
parent 463b2cbf01
commit 42a7e83c33
30 changed files with 945 additions and 560 deletions

3
.gitmodules vendored
View File

@ -16,3 +16,6 @@
[submodule "external/googletest"]
path = external/googletest
url = https://github.com/google/googletest.git
[submodule "external/uWebSockets"]
path = external/uWebSockets
url = https://github.com/uNetworking/uWebSockets.git

View File

@ -280,6 +280,7 @@ if(NOT MANUAL_SUBMODULES)
if(BUILD_TESTS)
check_submodule(external/googletest)
endif()
check_submodule(external/uWebSockets uSockets)
endif()
endif()

View File

@ -90,3 +90,28 @@ endif()
add_subdirectory(db_drivers)
add_subdirectory(easylogging++)
add_subdirectory(randomx EXCLUDE_FROM_ALL)
# uSockets doesn't really have a proper build system (just a very simple Makefile) so build it
# ourselves.
if (NOT CMAKE_VERSION VERSION_LESS 3.12)
set(conf_depends "CONFIGURE_DEPENDS")
else()
set(conf_depends "")
endif()
file(GLOB usockets_src ${conf_depends} uWebSockets/uSockets/src/*.c uWebSockets/uSockets/src/eventing/*.c)
file(COPY uWebSockets/uSockets/src/libusockets.h DESTINATION uWebSockets)
add_library(uSockets STATIC EXCLUDE_FROM_ALL ${usockets_src})
target_compile_definitions(uSockets PRIVATE LIBUS_NO_SSL=1)
target_include_directories(uSockets PRIVATE uWebSockets/uSockets/src)
# The uWebSockets C++ layer is header-only but isn't actually prefixed in the repository itself, but
# rather only on install (which, as above, is just a very simple Makefile). This is unfortunate
# because it means that we can't use `#include <uWebSockets/App.h>` directly with the repo; so
# instead we emulate the installation process into the build directory and include it (with the
# prefix) from there.
file(COPY uWebSockets/src/ DESTINATION uWebSockets/uWebSockets FILES_MATCHING PATTERN "*.h" PATTERN "*.hpp")
add_library(uWebSockets INTERFACE)
target_include_directories(uWebSockets INTERFACE ${CMAKE_CURRENT_BINARY_DIR}/uWebSockets)
target_link_libraries(uWebSockets INTERFACE uSockets)
target_compile_definitions(uWebSockets INTERFACE UWS_HTTPRESPONSE_NO_WRITEMARK UWS_NO_ZLIB)

2
external/loki-mq vendored

@ -1 +1 @@
Subproject commit 7cd58e46770549a409e027308d7959b39281ceaa
Subproject commit af189a8d72f1c0614a72014785fc8b65cdb0fbff

1
external/uWebSockets vendored Submodule

@ -0,0 +1 @@
Subproject commit 8dfabd559d4a54f29bd04d397223da0140610c8e

View File

@ -1,5 +1,6 @@
#include "string_util.h"
#include <cassert>
#include <sstream>
namespace tools {
@ -75,4 +76,40 @@ std::string lowercase_ascii_string(std::string src)
return src;
}
std::string friendly_duration(std::chrono::nanoseconds dur) {
std::ostringstream os;
bool some = false;
if (dur >= 24h) {
os << dur / 24h << 'd';
dur %= 24h;
some = true;
}
if (dur >= 1h || some) {
os << dur / 1h << 'h';
dur %= 1h;
some = true;
}
if (dur >= 1min || some) {
os << dur / 1min << 'm';
dur %= 1min;
some = true;
}
if (some) {
// If we have >= minutes then don't bother with fractional seconds
os << dur / 1s << 's';
} else {
double seconds = std::chrono::duration<double>(dur).count();
os.precision(3);
if (dur >= 1s)
os << seconds << "s";
else if (dur >= 1ms)
os << seconds * 1000 << "ms";
else if (dur >= 1us)
os << seconds * 1'000'000 << u8"µs";
else
os << seconds * 1'000'000'000 << "ns";
}
return os.str();
}
}

View File

@ -4,10 +4,13 @@
#include <iterator>
#include <charconv>
#include <sstream>
#include <chrono>
#include "span.h" // epee
namespace tools {
using namespace std::literals;
/// Returns true if the first string is equal to the second string, compared case-insensitively.
inline bool string_iequal(std::string_view s1, std::string_view s2) {
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(), [](char a, char b) {
@ -108,4 +111,7 @@ std::string copy_guts(const T& val) {
std::string lowercase_ascii_string(std::string src);
/// Converts a duration into a human friendlier string.
std::string friendly_duration(std::chrono::nanoseconds dur);
}

View File

@ -275,19 +275,27 @@ namespace cryptonote
// Loads stubs that fail if invoked. The stubs are replaced in the cryptonote_protocol/quorumnet.cpp glue code.
[[noreturn]] static void need_core_init() {
throw std::logic_error("Internal error: quorumnet::init_core_callbacks() should have been called");
throw std::logic_error("Internal error: core callback initialization was not performed!");
}
void *(*quorumnet_new)(core&);
void (*quorumnet_delete)(void*&self);
void (*quorumnet_relay_obligation_votes)(void* self, const std::vector<service_nodes::quorum_vote_t>&);
std::future<std::pair<blink_result, std::string>> (*quorumnet_send_blink)(core& core, const std::string& tx_blob);
void (*long_poll_trigger)(tx_memory_pool& pool);
static bool init_core_callback_stubs() {
quorumnet_new = [](core&) -> void* { need_core_init(); };
quorumnet_delete = [](void*&) { need_core_init(); };
quorumnet_relay_obligation_votes = [](void*, const std::vector<service_nodes::quorum_vote_t>&) { need_core_init(); };
quorumnet_send_blink = [](core&, const std::string&) -> std::future<std::pair<blink_result, std::string>> { need_core_init(); };
long_poll_trigger = [](tx_memory_pool&) { need_core_init(); };
return false;
}
// This variable is only here to let us call the above during static initialization.
bool init_core_callback_complete = init_core_callback_stubs();
//-----------------------------------------------------------------------------------------------
@ -1134,7 +1142,6 @@ namespace cryptonote
if (m_quorumnet_state)
quorumnet_delete(m_quorumnet_state);
m_lmq.reset();
m_long_poll_wake_up_clients.notify_all();
m_service_node_list.store();
m_miner.stop();
m_mempool.deinit();
@ -1407,7 +1414,7 @@ namespace cryptonote
}
}
if (tx_pool_changed) m_long_poll_wake_up_clients.notify_all();
if (tx_pool_changed) long_poll_trigger(m_mempool);
return ok;
}
//-----------------------------------------------------------------------------------------------

View File

@ -92,8 +92,12 @@ namespace cryptonote
// Sends a blink tx to the current blink quorum, returns a future that can be used to wait for the
// result.
extern std::future<std::pair<blink_result, std::string>> (*quorumnet_send_blink)(core& core, const std::string& tx_blob);
extern bool init_core_callback_complete;
// Function pointer that we invoke when the mempool has changed; this gets set during
// rpc/http_server.cpp's init_options().
extern void (*long_poll_trigger)(tx_memory_pool& pool);
extern bool init_core_callback_complete;
/************************************************************************/
/* */
@ -998,9 +1002,6 @@ namespace cryptonote
* @return true
*/
bool relay_txpool_transactions();
std::mutex m_long_poll_mutex;
std::condition_variable m_long_poll_wake_up_clients;
private:
/**

View File

@ -44,9 +44,8 @@ command_parser_executor::command_parser_executor(
uint32_t ip
, uint16_t port
, const std::optional<tools::login>& login
, const epee::net_utils::ssl_options_t& ssl_options
)
: m_executor{ip, port, login, ssl_options}
: m_executor{ip, port, login}
{}
command_parser_executor::command_parser_executor(cryptonote::rpc::core_rpc_server& rpc_server)

View File

@ -48,7 +48,6 @@ public:
uint32_t ip
, uint16_t port
, const std::optional<tools::login>& login
, const epee::net_utils::ssl_options_t& ssl_options
);
/// Invokes via local daemon

View File

@ -48,9 +48,8 @@ command_server::command_server(
uint32_t ip
, uint16_t port
, const std::optional<tools::login>& login
, const epee::net_utils::ssl_options_t& ssl_options
)
: m_parser{ip, port, login, ssl_options}
: m_parser{ip, port, login}
{
init_commands();
}

View File

@ -49,7 +49,6 @@ public:
uint32_t ip
, uint16_t port
, const std::optional<tools::login>& login
, const epee::net_utils::ssl_options_t& ssl_options
);
/// Non-remote constructor

View File

@ -32,6 +32,7 @@
#include <memory>
#include <stdexcept>
#include <lokimq/lokimq.h>
#include <utility>
#include "misc_log_ex.h"
#if defined(PER_BLOCK_CHECKPOINT)
@ -62,53 +63,13 @@ extern "C" {
}
#endif
using namespace std::literals;
#undef LOKI_DEFAULT_LOG_CATEGORY
#define LOKI_DEFAULT_LOG_CATEGORY "daemon"
namespace daemonize {
http_rpc_server::http_rpc_server(boost::program_options::variables_map const &vm,
cryptonote::rpc::core_rpc_server &corerpc,
const bool restricted,
const std::string &port,
std::string description)
: m_server{corerpc}
, m_description{std::move(description)}
{
if (!m_server.init(vm, restricted, port))
{
throw std::runtime_error("Failed to initialize " + m_description + " HTTP RPC server.");
}
}
void http_rpc_server::run()
{
if (!m_server.run(m_server.m_max_long_poll_connections + cryptonote::rpc::http_server::DEFAULT_RPC_THREADS,
false /*wait - for all threads in the pool to exit when terminating*/))
{
throw std::runtime_error("Failed to start " + m_description + " HTTP RPC server.");
}
}
void http_rpc_server::stop()
{
m_server.send_stop_signal();
m_server.server_stop();
}
http_rpc_server::~http_rpc_server()
{
try
{
m_server.deinit();
}
catch (...)
{
MERROR("Failed to deinitialize " << m_description << " RPC server...");
}
}
static uint16_t parse_public_rpc_port(const boost::program_options::variables_map& vm)
{
const auto& public_node_arg = cryptonote::rpc::http_server::arg_public_node;
@ -171,14 +132,15 @@ daemon::daemon(boost::program_options::variables_map vm_) :
const auto restricted = command_line::get_arg(vm, cryptonote::rpc::http_server::arg_restricted_rpc);
const auto main_rpc_port = command_line::get_arg(vm, cryptonote::rpc::http_server::arg_rpc_bind_port);
MGINFO("- core HTTP RPC server");
http_rpcs.emplace_back(vm, *rpc, restricted, main_rpc_port, "core");
http_rpcs.emplace_back(std::piecewise_construct, std::tie("core"), std::tie(*rpc, vm, restricted, main_rpc_port));
}
if (!command_line::is_arg_defaulted(vm, cryptonote::rpc::http_server::arg_rpc_restricted_bind_port))
{
bool restricted = true;
auto restricted_rpc_port = command_line::get_arg(vm, cryptonote::rpc::http_server::arg_rpc_restricted_bind_port);
MGINFO("- restricted HTTP RPC server");
http_rpcs.emplace_back(vm, *rpc, true, restricted_rpc_port, "restricted");
http_rpcs.emplace_back(std::piecewise_construct, std::tie("restricted"), std::tie(*rpc, vm, restricted, restricted_rpc_port));
}
MGINFO_BLUE("Done daemon object initialization");
@ -189,7 +151,7 @@ daemon::~daemon()
MGINFO_BLUE("Deinitializing daemon objects...");
while (!http_rpcs.empty()) {
MGINFO("- " << http_rpcs.back().m_description << " HTTP RPC server");
MGINFO("- " << http_rpcs.back().first << " HTTP RPC server");
http_rpcs.pop_back();
}
@ -265,10 +227,14 @@ bool daemon::run(bool interactive)
if (!core->init(vm, nullptr, get_checkpoints))
throw std::runtime_error("Failed to start core");
for(auto& rpc: http_rpcs)
MGINFO("Starting LokiMQ");
lmq_rpc = std::make_unique<cryptonote::rpc::lmq_rpc>(*core, *rpc, vm);
core->start_lokimq();
for(auto& [desc, rpc]: http_rpcs)
{
MGINFO("Starting " << rpc.m_description << " HTTP RPC server");
rpc.run();
MGINFO("Starting " << desc << " HTTP RPC server");
rpc.start();
}
MGINFO("Starting RPC daemon handler");
@ -280,10 +246,6 @@ bool daemon::run(bool interactive)
p2p->set_rpc_port(public_rpc_port);
}
MGINFO("Starting LokiMQ");
lmq_rpc = std::make_unique<cryptonote::rpc::lmq_rpc>(*core, *rpc, vm);
core->start_lokimq();
std::unique_ptr<daemonize::command_server> rpc_commands;
if (interactive)
{
@ -307,10 +269,10 @@ bool daemon::run(bool interactive)
rpc_commands->stop_handling();
}
for (auto& rpc : http_rpcs)
for (auto& [desc, rpc] : http_rpcs)
{
MGINFO("Stopping " << rpc.m_description << " HTTP RPC server...");
rpc.stop();
MGINFO("Stopping " << desc << " HTTP RPC server...");
rpc.shutdown();
}
MGINFO("Node stopped.");

View File

@ -47,21 +47,6 @@
namespace daemonize
{
class http_rpc_server
{
public:
http_rpc_server(boost::program_options::variables_map const &vm,
cryptonote::rpc::core_rpc_server &corerpc,
const bool restricted,
const std::string &port,
std::string description);
void run();
void stop();
~http_rpc_server();
cryptonote::rpc::http_server m_server;
std::string m_description;
};
class daemon {
public:
@ -87,7 +72,7 @@ private:
std::unique_ptr<protocol_handler> protocol;
std::unique_ptr<node_server> p2p;
std::unique_ptr<cryptonote::rpc::core_rpc_server> rpc;
std::list<http_rpc_server> http_rpcs;
std::list<std::pair<std::string, cryptonote::rpc::http_server>> http_rpcs;
std::unique_ptr<cryptonote::rpc::lmq_rpc> lmq_rpc;
};

View File

@ -218,20 +218,14 @@ int main(int argc, char const * argv[])
{
const cryptonote::rpc_args::descriptors arg{};
auto rpc_ip_str = command_line::get_arg(vm, arg.rpc_bind_ip);
auto rpc_port_str = command_line::get_arg(vm, cryptonote::rpc::http_server::arg_rpc_bind_port);
auto rpc_port = command_line::get_arg(vm, cryptonote::rpc::http_server::arg_rpc_bind_port);
uint32_t rpc_ip;
uint16_t rpc_port;
if (!epee::string_tools::get_ip_int32_from_string(rpc_ip, rpc_ip_str))
{
std::cerr << "Invalid IP: " << rpc_ip_str << std::endl;
return 1;
}
if (!epee::string_tools::get_xtype_from_string(rpc_port, rpc_port_str))
{
std::cerr << "Invalid port: " << rpc_port_str << std::endl;
return 1;
}
const char *env_rpc_login = nullptr;
const bool has_rpc_arg = command_line::has_arg(vm, arg.rpc_login);
@ -252,11 +246,7 @@ int main(int argc, char const * argv[])
}
}
auto ssl_options = cryptonote::rpc_args::process_ssl(vm, true);
if (!ssl_options)
return 1;
daemonize::command_server rpc_commands{rpc_ip, rpc_port, std::move(login), std::move(*ssl_options)};
daemonize::command_server rpc_commands{rpc_ip, rpc_port, std::move(login)};
return rpc_commands.process_command_and_log(command) ? 0 : 1;
}
}

View File

@ -219,13 +219,13 @@ rpc_command_executor::rpc_command_executor(
uint32_t ip
, uint16_t port
, const std::optional<tools::login>& login
, const epee::net_utils::ssl_options_t& ssl_options
)
{
std::optional<epee::net_utils::http::login> http_login{};
if (login)
http_login.emplace(login->username, login->password.password());
m_rpc_client = std::make_unique<tools::t_rpc_client>(ip, port, std::move(http_login), ssl_options);
// FIXME: make ssl argument here optional (and default to disabled)
m_rpc_client = std::make_unique<tools::t_rpc_client>(ip, port, std::move(http_login), epee::net_utils::ssl_support_t::e_ssl_support_disabled);
}
bool rpc_command_executor::print_checkpoints(uint64_t start_height, uint64_t end_height, bool print_json)
@ -626,7 +626,6 @@ bool rpc_command_executor::print_connections() {
tools::msg_writer() << std::setw(30) << std::left << "Remote Host"
<< std::setw(8) << "Type"
<< std::setw(6) << "SSL"
<< std::setw(20) << "Peer id"
<< std::setw(20) << "Support Flags"
<< std::setw(30) << "Recv/Sent (inactive,sec)"
@ -647,7 +646,6 @@ bool rpc_command_executor::print_connections() {
//<< std::setw(30) << std::left << in_out
<< std::setw(30) << std::left << address
<< std::setw(8) << (get_address_type_name((epee::net_utils::address_type)info.address_type))
<< std::setw(6) << (info.ssl ? "yes" : "no")
<< std::setw(20) << info.peer_id
<< std::setw(20) << info.support_flags
<< std::setw(30) << std::to_string(info.recv_count) + "(" + std::to_string(count_seconds(info.recv_idle_time)) + ")/" + std::to_string(info.send_count) + "(" + std::to_string(count_seconds(info.send_idle_time)) + ")"

View File

@ -56,7 +56,6 @@ public:
uint32_t ip
, uint16_t port
, const std::optional<tools::login>& user
, const epee::net_utils::ssl_options_t& ssl_options
);
/// Executor for local daemon RPC
rpc_command_executor(cryptonote::rpc::core_rpc_server& rpc_server)

View File

@ -88,6 +88,8 @@ target_link_libraries(daemon_messages
extra)
target_link_libraries(daemon_rpc_server
PUBLIC
uWebSockets
PRIVATE
rpc
daemon_messages

View File

@ -153,9 +153,9 @@ namespace cryptonote { namespace rpc {
static_assert(std::is_same<Response, invoke_return_type>::value,
"Unable to register RPC command: core_rpc_server::invoke(Request) is not defined or does not return a Response");
auto cmd = std::make_shared<rpc_command>();
constexpr bool binary = std::is_base_of<BINARY, RPC>::value;
cmd->is_public = std::is_base_of<PUBLIC, RPC>::value;
cmd->is_binary = binary;
cmd->is_public = std::is_base_of_v<PUBLIC, RPC>;
cmd->is_binary = std::is_base_of_v<BINARY, RPC>;
cmd->is_legacy = std::is_base_of_v<LEGACY, RPC>;
cmd->invoke = [](rpc_request&& request, core_rpc_server& server) {
reg_helper<RPC> helper;
Response res = server.invoke(helper.load(request), std::move(request.context));
@ -203,7 +203,7 @@ namespace cryptonote { namespace rpc {
{
command_line::add_arg(desc, arg_bootstrap_daemon_address);
command_line::add_arg(desc, arg_bootstrap_daemon_login);
cryptonote::rpc_args::init_options(desc, true);
cryptonote::rpc_args::init_options(desc);
}
//------------------------------------------------------------------------------------------------------------------------------
core_rpc_server::core_rpc_server(
@ -1329,47 +1329,6 @@ namespace cryptonote { namespace rpc {
std::vector<crypto::hash> tx_pool_hashes;
m_core.get_pool().get_transaction_hashes(tx_pool_hashes, context.admin);
if (req.long_poll)
{
/** FIXME: this needs to go into HTTP RPC-specific layer
*
if (m_max_long_poll_connections <= 0)
{
// Essentially disable long polling by making the wallet long polling thread go to sleep due to receiving this message
res.status = STATUS_TX_LONG_POLL_MAX_CONNECTIONS;
return res;
}
crypto::hash checksum = {};
for (crypto::hash const &hash : tx_pool_hashes) crypto::hash_xor(checksum, hash);
if (req.tx_pool_checksum == checksum)
{
size_t tx_count_before = tx_pool_hashes.size();
time_t before = time(nullptr);
std::unique_lock<std::mutex> lock(m_core.m_long_poll_mutex);
if ((m_long_poll_active_connections + 1) > m_max_long_poll_connections)
{
res.status = STATUS_TX_LONG_POLL_MAX_CONNECTIONS;
return res;
}
m_long_poll_active_connections++;
bool condition_activated = m_core.m_long_poll_wake_up_clients.wait_for(lock, long_poll_timeout, [this, tx_count_before]() {
size_t tx_count_after = m_core.get_pool().get_transactions_count();
return tx_count_before != tx_count_after;
});
m_long_poll_active_connections--;
if (!condition_activated)
{
res.status = STATUS_TX_LONG_POLL_TIMED_OUT;
return res;
}
}
*/
}
res.tx_hashes = std::move(tx_pool_hashes);
res.status = STATUS_OK;
return res;

View File

@ -59,8 +59,6 @@ class variables_map;
namespace cryptonote { namespace rpc {
static constexpr auto long_poll_timeout = 15s;
/// Exception when trying to invoke an RPC command that indicate a parameter parse failure (will
/// give an invalid params error for JSON-RPC, for example).
struct parse_error : std::runtime_error { using std::runtime_error::runtime_error; };
@ -108,9 +106,9 @@ namespace cryptonote { namespace rpc {
// The RPC engine source of the request, i.e. internal, HTTP, LMQ
rpc_source source = rpc_source::internal;
// A free-form identifier identifiying the remote address of the request; this might be IP:PORT,
// or could contain a pubkey, or ...
std::string_view remote;
// A free-form identifier (meant for humans) identifiying the remote address of the request;
// this might be IP:PORT, or could contain a pubkey, or ...
std::string remote;
};
struct rpc_request {
@ -139,6 +137,7 @@ namespace cryptonote { namespace rpc {
std::string(*invoke)(rpc_request&&, core_rpc_server&);
bool is_public; // callable via restricted RPC
bool is_binary; // only callable at /name (for HTTP RPC), and binary data, not JSON.
bool is_legacy; // callable at /name (for HTTP RPC), even though it is JSON (for backwards compat).
};
/// RPC command registration; to add a new command, define it in core_rpc_server_commands_defs.h
@ -178,6 +177,10 @@ namespace cryptonote { namespace rpc {
static void init_options(boost::program_options::options_description& desc);
void init(const boost::program_options::variables_map& vm);
/// Returns a reference to the owning cryptonote core object
core& get_core() { return m_core; }
const core& get_core() const { return m_core; }
network_type nettype() const { return m_core.get_nettype(); }
GET_HEIGHT::response invoke(GET_HEIGHT::request&& req, rpc_context context);

View File

@ -1067,6 +1067,8 @@ namespace rpc {
{
static constexpr auto names() { return NAMES("get_transaction_pool_hashes.bin"); }
static constexpr std::chrono::seconds long_poll_timeout{15};
struct request
{
bool long_poll; // Optional: If true, this call is blocking until timeout OR tx pool has changed since the last query. TX pool change is detected by comparing the hash of all the hashes in the tx pool. Ignored when using LMQ RPC.

View File

@ -1,29 +1,65 @@
#include "http_server.h"
#include <chrono>
#include <exception>
#include <lokimq/base64.h>
#include <boost/endian/conversion.hpp>
#include "common/string_util.h"
#include "net/jsonrpc_structs.h" // epee
#include "rpc/core_rpc_server_commands_defs.h"
#include "rpc/rpc_args.h"
#include "version.h"
#undef LOKI_DEFAULT_LOG_CATEGORY
#define LOKI_DEFAULT_LOG_CATEGORY "daemon.rpc"
namespace cryptonote { namespace rpc {
namespace cryptonote::rpc {
const command_line::arg_descriptor<std::string, false, true, 2> http_server::arg_rpc_bind_port = {
/// Checks an Authorization header for Basic login credentials.
///
/// We don't support Digest because it it is deprecated, expensive, and useless: any
/// authentication should either be constrained to a localhost connection or done over HTTPS (in
/// which case Basic is perfectly fine). It's expensive in that it requires multiple requests in
/// order to request a nonce, and requires considerable code to proper support (e.g. with nonce
/// tracking, etc.). Given that it adds nothing security-wise it it is not worth supporting.
///
/// Takes the auth header and a callback to invoke to check the username/password which should
/// return true if the user is allowed, false if denied. The callback should be callable with two
/// std::string_view's: username and password.
template <typename Callback>
std::optional<std::string_view> check_authorization(std::string_view auth_header, Callback check_login) {
constexpr std::optional<std::string_view> fail = "Basic realm=\"lokid rpc\", charset=\"UTF-8\""sv;
auto parts = tools::split_any(auth_header, " \t\r\n", true);
if (parts.size() < 2 || parts[0] != "Basic"sv || !lokimq::is_base64(parts[1]))
return fail;
auto login = lokimq::from_base64(parts[1]);
auto colon = login.find(':');
if (colon == std::string_view::npos)
return fail;
if (check_login(std::string_view{login}.substr(0, colon), std::string_view{login}.substr(colon+1)))
return std::nullopt;
return fail;
}
const command_line::arg_descriptor<uint16_t, false, true, 2> http_server::arg_rpc_bind_port = {
"rpc-bind-port"
, "Port for RPC server"
, std::to_string(config::RPC_DEFAULT_PORT)
, config::RPC_DEFAULT_PORT
, {{ &cryptonote::arg_testnet_on, &cryptonote::arg_stagenet_on }}
, [](std::array<bool, 2> testnet_stagenet, bool defaulted, std::string val)->std::string {
if (testnet_stagenet[0] && defaulted)
return std::to_string(config::testnet::RPC_DEFAULT_PORT);
else if (testnet_stagenet[1] && defaulted)
return std::to_string(config::stagenet::RPC_DEFAULT_PORT);
return val;
, [](std::array<bool, 2> testnet_stagenet, bool defaulted, uint16_t val) {
auto [testnet, stagenet] = testnet_stagenet;
return
(defaulted && testnet) ? config::testnet::RPC_DEFAULT_PORT :
(defaulted && stagenet) ? config::stagenet::RPC_DEFAULT_PORT :
val;
}
};
const command_line::arg_descriptor<std::string> http_server::arg_rpc_restricted_bind_port = {
const command_line::arg_descriptor<uint16_t> http_server::arg_rpc_restricted_bind_port = {
"rpc-restricted-bind-port"
, "Port for restricted RPC server"
, ""
, 0
};
const command_line::arg_descriptor<bool> http_server::arg_restricted_rpc = {
@ -38,16 +74,7 @@ namespace cryptonote { namespace rpc {
, false
};
//
// Loki
//
const command_line::arg_descriptor<int> http_server::arg_rpc_long_poll_connections = {
"rpc-long-poll-connections"
, "Number of RPC connections allocated for long polling wallet queries to the TX pool"
, 16
};
constexpr int http_server::DEFAULT_RPC_THREADS;
namespace { void long_poll_trigger(cryptonote::tx_memory_pool&); }
//-----------------------------------------------------------------------------------
void http_server::init_options(boost::program_options::options_description& desc)
@ -56,35 +83,8 @@ namespace cryptonote { namespace rpc {
command_line::add_arg(desc, arg_rpc_restricted_bind_port);
command_line::add_arg(desc, arg_restricted_rpc);
command_line::add_arg(desc, arg_public_node);
command_line::add_arg(desc, arg_rpc_long_poll_connections);
}
//------------------------------------------------------------------------------------------------------------------------------
bool http_server::init(
const boost::program_options::variables_map& vm
, const bool restricted
, const std::string& port
)
{
m_restricted = restricted;
m_net_server.set_threads_prefix("RPC");
m_max_long_poll_connections = command_line::get_arg(vm, arg_rpc_long_poll_connections);
auto rpc_config = cryptonote::rpc_args::process(vm, true);
if (!rpc_config)
return false;
std::optional<epee::net_utils::http::login> http_login{};
if (rpc_config->login)
http_login.emplace(std::move(rpc_config->login->username), std::move(rpc_config->login->password).password());
auto rng = [](size_t len, uint8_t *ptr){ return crypto::rand(len, ptr); };
return epee::http_server_impl_base<http_server, connection_context>::init(
rng, std::move(port), std::move(rpc_config->bind_ip),
std::move(rpc_config->bind_ipv6_address), std::move(rpc_config->use_ipv6), std::move(rpc_config->require_ipv4),
std::move(rpc_config->access_control_origins), std::move(http_login), std::move(rpc_config->ssl_options)
);
cryptonote::long_poll_trigger = long_poll_trigger;
}
static constexpr http_response_code
@ -92,181 +92,648 @@ namespace cryptonote { namespace rpc {
HTTP_BAD_REQUEST{400, "Bad Request"sv},
HTTP_FORBIDDEN{403, "Forbidden"sv},
HTTP_NOT_FOUND{404, "Not Found"sv},
HTTP_ERROR{500, "Internal Server Error"sv};
HTTP_ERROR{500, "Internal Server Error"sv},
HTTP_SERVICE_UNAVAILABLE{503, "Service Unavailable"sv};
bool http_server::handle_http_request(
const epee::net_utils::http::http_request_info& query_info,
epee::net_utils::http::http_response_info& response,
connection_context& context)
{
std::chrono::steady_clock::time_point start;
bool time_logging = LOG_ENABLED(Debug);
if (time_logging)
start = std::chrono::steady_clock::now();
std::pair<int, std::string_view> http_status = HTTP_ERROR;
std::string exception;
try {
http_status = handle_http(query_info, response, context);
} catch (const std::exception& e) {
exception = ", request raised an exception: "s + e.what();
} catch (...) {
exception = ", request raised an unknown exception";
}
response.m_response_code = http_status.first;
response.m_response_comment = std::string{http_status.second};
std::string elapsed;
if (time_logging)
{
auto dur = std::chrono::steady_clock::now() - start;
std::ostringstream el;
el << ", in ";
el.precision(3);
if (dur >= 1s)
el << std::chrono::duration_cast<std::chrono::milliseconds>(dur).count() / 1000. << 's';
else if (dur >= 1ms)
el << std::chrono::duration_cast<std::chrono::microseconds>(dur).count() / 1000. << "ms";
else if (dur >= 1us)
el << std::chrono::duration_cast<std::chrono::nanoseconds>(dur).count() / 1000. << "us";
else
el << std::chrono::duration_cast<std::chrono::nanoseconds>(dur).count() << "ns";
elapsed = el.str();
}
MLOG(exception.empty() ? el::Level::Info : el::Level::Warning,
"HTTP [" << context.m_remote_address.host_str() << "] " << query_info.m_http_method_str << " " << query_info.m_URI <<
" >>> " << http_status.first << " " << http_status.second <<
exception << elapsed);
return true;
// Sends an error response and finalizes the response.
void http_server::error_response(
HttpResponse& res,
http_response_code code,
std::optional<std::string_view> body) const {
res.writeStatus(std::to_string(code.first) + " " + std::string{code.second});
res.writeHeader("Server", m_server_header);
res.writeHeader("Content-Type", "text/plain");
if (body)
res.end(*body);
else
res.end(std::string{code.second} + "\n");
}
static http_response_code json_rpc_error(int code, std::string message, std::string& body)
// Similar to the above, but for JSON errors (which are 200 OK + error embedded in JSON)
void http_server::jsonrpc_error_response(HttpResponse& res, int code, std::string message, std::optional<epee::serialization::storage_entry> id) const
{
epee::json_rpc::error_response rsp;
rsp.jsonrpc = "2.0";
if (id)
rsp.id = *id;
rsp.error.code = code;
rsp.error.message = std::move(message);
std::string body;
epee::serialization::store_t_to_json(rsp, body);
return HTTP_OK;
res.writeStatus("200 OK"sv);
res.writeHeader("Server", m_server_header);
res.writeHeader("Content-Type", "application/json");
res.end(body);
}
http_response_code http_server::handle_http(
const epee::net_utils::http::http_request_info& query_info,
epee::net_utils::http::http_response_info& response_info,
connection_context& context)
//------------------------------------------------------------------------------------------------------------------------------
http_server::http_server(
core_rpc_server& server,
const boost::program_options::variables_map& vm,
const bool restricted,
uint16_t port
) : m_server{server}, m_restricted{restricted}
{
auto uri = query_info.m_URI.size() > 0 && query_info.m_URI[0] == '/' ? query_info.m_URI.substr(1) : query_info.m_URI;
// uWS is designed to work from a single thread, which is good (we pull off the requests and
// then stick them into the LMQ job queue to be scheduled along with other jobs). But as a
// consequence, we need to create everything inside that thread. We *also* need to get the
// (thread local) event loop pointer back from the thread so that we can shut it down later
// (injecting a callback into it is one of the few thread-safe things we can do across threads).
//
// Things we need in the owning thread, fulfilled from the http thread:
auto remote = context.m_remote_address.str();
// - the uWS::Loop* for the event loop thread (which is thread_local). We can get this during
// thread startup, after the thread does basic initialization.
std::promise<uWS::Loop*> loop_promise;
auto loop_future = loop_promise.get_future();
// - the us_listen_socket_t* on which the server is listening. We can't get this until we
// actually start listening, so wait until `start()` for it. (We also double-purpose it to
// send back an exception if one fires during startup).
std::promise<std::vector<us_listen_socket_t*>> startup_success_promise;
m_startup_success = startup_success_promise.get_future();
// Things we need to send from the owning thread to the event loop thread:
// - a signal when the thread should bind to the port and start the event loop (when we call
// start()).
//m_startup_promise
m_rpc_thread = std::thread{[this, rpc_config=cryptonote::rpc_args::process(vm), port] (
std::promise<uWS::Loop*> loop_promise,
std::future<bool> startup_future,
std::promise<std::vector<us_listen_socket_t*>> startup_success) {
uWS::App http;
try {
create_rpc_endpoints(http);
} catch (...) {
loop_promise.set_exception(std::current_exception());
}
loop_promise.set_value(uWS::Loop::get());
if (!startup_future.get())
// False means cancel, i.e. we got destroyed/shutdown without start() being called
return;
std::vector<std::pair<std::string /*addr*/, bool /*required*/>> bind_addr;
if (!rpc_config.bind_ip.empty())
bind_addr.emplace_back(rpc_config.bind_ip, rpc_config.require_ipv4);
if (rpc_config.use_ipv6 && !rpc_config.bind_ipv6_address.empty())
bind_addr.emplace_back(rpc_config.bind_ipv6_address, true);
std::vector<us_listen_socket_t*> listening;
try {
bool bad = false;
int good = 0;
for (const auto& [addr, required] : bind_addr)
http.listen(addr, port, [&listening, req=required, &good, &bad](us_listen_socket_t* sock) {
listening.push_back(sock);
if (sock != nullptr) good++;
else if (req) bad = true;
});
if (!good || bad) {
std::ostringstream error;
error << "RPC HTTP server failed to bind; ";
if (listening.empty()) error << "no valid bind address(es) given";
else {
error << "tried to bind to:";
for (const auto& [addr, required] : bind_addr)
error << ' ' << addr << ':' << port;
}
throw std::logic_error(error.str());
}
} catch (...) {
startup_success.set_exception(std::current_exception());
return;
}
startup_success.set_value(std::move(listening));
http.run();
}, std::move(loop_promise), m_startup_promise.get_future(), std::move(startup_success_promise)};
m_loop = loop_future.get();
}
void http_server::create_rpc_endpoints(uWS::App& http)
{
auto access_denied = [this](HttpResponse* res, HttpRequest* req) {
MINFO("Forbidden HTTP request for restricted endpoint " << req->getMethod() << " " << req->getUrl());
error_response(*res, HTTP_FORBIDDEN);
};
for (auto& [name, call] : rpc_commands) {
if (call->is_legacy || call->is_binary) {
if (!call->is_public && m_restricted)
http.any("/" + name, access_denied);
else
http.any("/" + name, [this, &call=*call](HttpResponse* res, HttpRequest* req) {
if (m_login && !check_auth(*req, *res))
return;
handle_base_request(*res, *req, call);
});
}
}
http.post("/json_rpc", [this](HttpResponse* res, HttpRequest* req) {
if (m_login && !check_auth(*req, *res))
return;
handle_json_rpc_request(*res, *req);
});
// Fallback to send a 404 for anything else:
http.any("/*", [this](HttpResponse* res, HttpRequest* req) {
if (m_login && !check_auth(*req, *res))
return;
MINFO("Invalid HTTP request for " << req->getMethod() << " " << req->getUrl());
error_response(*res, HTTP_NOT_FOUND);
});
}
bool http_server::check_auth(HttpRequest& req, HttpResponse& res)
{
if (auto www_auth = check_authorization(req.getHeader("authorization"),
[this] (const std::string_view user, const std::string_view pass) {
return user == m_login->username && pass == m_login->password.password().view(); }))
{
res.writeStatus("401 Unauthorized");
res.writeHeader("Server", m_server_header);
res.writeHeader("WWW-Authenticate", *www_auth);
res.writeHeader("Content-Type", "text/plain");
if (req.getMethod() != "HEAD"sv)
res.end("Login required");
return false;
}
return true;
}
namespace {
struct call_data {
http_server& http;
core_rpc_server& core_rpc;
HttpResponse& res;
std::string uri;
const rpc_command* call{nullptr};
rpc_request request{};
bool aborted{false};
bool replied{false};
bool jsonrpc{false};
std::string jsonrpc_id; // pre-formatted json value
// If we have to drop the request because we are overloaded we want to reply with an error (so
// that we close the connection instead of leaking it and leaving it hanging). We don't do
// this, of course, if the request got aborted and replied to.
~call_data() {
if (replied || aborted) return;
http.loop_defer([&http=http, &res=res, jsonrpc=jsonrpc] {
if (jsonrpc)
http.jsonrpc_error_response(res, -32003, "Server busy, try again later");
else
http.error_response(res, HTTP_SERVICE_UNAVAILABLE, "Server busy, try again later");
});
}
call_data(const call_data&) = delete;
call_data(call_data&&) = delete;
call_data& operator=(const call_data&) = delete;
call_data& operator=(call_data&&) = delete;
};
// Queues a response for the HTTP thread to handle; the response can be in multiple string pieces
// to be concatenated together.
void queue_response(std::shared_ptr<call_data> data, std::vector<std::string> body)
{
auto& http = data->http;
data->replied = true;
http.loop_defer([data=std::move(data), body=std::move(body)] {
if (data->aborted)
return;
data->res.cork([&res=data->res, &svr=data->http.server_header(), body=std::move(body), binary=data->call->is_binary] {
res.writeHeader("Server", svr);
res.writeHeader("Content-Type", binary ? "application/octet-stream"sv : "application/json"sv);
for (const auto& piece : body)
res.write(piece);
res.end();
});
});
}
// Wrapper around the above that takes a single string
void queue_response(std::shared_ptr<call_data> data, std::string body)
{
std::vector<std::string> b;
b.push_back(std::move(body));
queue_response(std::move(data), std::move(b));
}
void invoke_txpool_hashes_bin(std::shared_ptr<call_data> data);
// Invokes the actual RPC request; this is called (via lokimq) from some random LMQ worker thread,
// which means we can't just write our reply; instead we have to post it to the uWS loop.
void invoke_rpc(std::shared_ptr<call_data> dataptr)
{
auto& data = *dataptr;
if (data.aborted) return;
// Replace the default tx pool hashes callback with our own (which adds long poll support):
if (std::string_view{data.uri}.substr(1) == rpc::GET_TRANSACTION_POOL_HASHES_BIN::names()[0])
return invoke_txpool_hashes_bin(std::move(dataptr));
const bool time_logging = LOG_ENABLED(Debug);
std::chrono::steady_clock::time_point start;
if (time_logging)
start = std::chrono::steady_clock::now();
std::vector<std::string> result;
result.reserve(data.jsonrpc ? 3 : 1);
if (data.jsonrpc)
{
result.emplace_back(R"({"jsonrpc":"2.0","id":)");
result.back() += data.jsonrpc_id;
result.back() += R"(,"result":)";
}
int json_error = -32603;
std::string json_message = "Internal error";
std::string http_message;
try {
result.push_back(data.call->invoke(std::move(data.request), data.core_rpc));
json_error = 0;
} catch (const parse_error& e) {
// This isn't really WARNable as it's the client fault; log at info level instead.
MINFO("HTTP RPC request '" << data.uri << "' called with invalid/unparseable data: " << e.what());
json_error = -32602;
http_message = "Unable to parse request: "s + e.what();
json_message = "Invalid params";
} catch (const rpc_error& e) {
MWARNING("HTTP RPC request '" << data.uri << "' failed with: " << e.what());
json_error = e.code;
json_message = e.message;
http_message = e.message;
} catch (const std::exception& e) {
MWARNING("HTTP RPC request '" << data.uri << "' raised an exception: " << e.what());
} catch (...) {
MWARNING("HTTP RPC request '" << data.uri << "' raised an unknown exception");
}
if (json_error != 0) {
data.replied = true;
data.http.loop_defer([data=std::move(dataptr), json_error, msg=std::move(data.jsonrpc ? json_message : http_message)] {
if (data->aborted) return;
if (data->jsonrpc)
data->http.jsonrpc_error_response(data->res, json_error, msg);
else
data->http.error_response(data->res, HTTP_ERROR, msg.empty() ? std::nullopt : std::make_optional<std::string_view>(msg));
});
return;
}
if (data.jsonrpc)
result.emplace_back("}\n");
std::string call_duration;
if (time_logging)
call_duration = " in " + tools::friendly_duration(std::chrono::steady_clock::now() - start);
MINFO("HTTP RPC " << data.uri << " [" << data.request.context.remote << "] OK (" << result.size() << " bytes)" << call_duration);
queue_response(std::move(dataptr), std::move(result));
}
std::string pool_hashes_response(std::vector<crypto::hash>&& pool_hashes) {
GET_TRANSACTION_POOL_HASHES_BIN::response res{};
res.tx_hashes = std::move(pool_hashes);
res.status = STATUS_OK;
std::string response;
epee::serialization::store_t_to_binary(res, response);
return response;
}
std::list<std::pair<std::shared_ptr<call_data>, std::chrono::steady_clock::time_point>> long_pollers;
std::mutex long_poll_mutex;
// HTTP-only long-polling support for the transaction pool hashes command
void invoke_txpool_hashes_bin(std::shared_ptr<call_data> data) {
GET_TRANSACTION_POOL_HASHES_BIN::request req{};
if (!epee::serialization::load_t_from_binary(req, std::get<std::string_view>(data->request.body)))
throw parse_error{"Failed to parse binary data parameters"};
std::vector<crypto::hash> pool_hashes;
data->core_rpc.get_core().get_pool().get_transaction_hashes(pool_hashes, data->request.context.admin);
if (req.long_poll)
{
crypto::hash checksum{};
for (const auto& h : pool_hashes) checksum ^= h;
if (req.tx_pool_checksum == checksum) {
// Hashes match, which means we need to defer this request until later.
std::lock_guard lock{long_poll_mutex};
MTRACE("Deferring long poll request from " << data->request.context.remote << ": long polling requested and remote's checksum matches current pool (" << checksum << ")");
long_pollers.emplace_back(std::move(data), std::chrono::steady_clock::now() + GET_TRANSACTION_POOL_HASHES_BIN::long_poll_timeout);
return;
}
MTRACE("Ignoring long poll request from " << data->request.context.remote << ": pool hash mismatch (remote: " << req.tx_pool_checksum << ", local: " << checksum << ")");
}
// Either not a long poll request or checksum didn't match
queue_response(std::move(data), pool_hashes_response(std::move(pool_hashes)));
}
// This get invoked (from cryptonote_core.cpp) whenever the mempool is added to. We queue
// responses for everyone currently waiting.
void long_poll_trigger(tx_memory_pool &pool) {
std::lock_guard lock{long_poll_mutex};
if (long_pollers.empty())
return;
MDEBUG("TX pool changed; sending tx pool to " << long_pollers.size() << " pending long poll connections");
std::optional<std::string> body_public, body_admin;
for (auto& [dataptr, expiry]: long_pollers)
{
auto& data = *dataptr;
auto& body = data.request.context.admin ? body_admin : body_public;
if (!body)
{
std::vector<crypto::hash> pool_hashes;
pool.get_transaction_hashes(pool_hashes, data.request.context.admin);
body = pool_hashes_response(std::move(pool_hashes));
}
MTRACE("Sending deferred long poll pool update to " << data.request.context.remote);
queue_response(std::move(dataptr), *body);
}
long_pollers.clear();
}
std::string long_poll_timeout_body;
// Called periodically to clear expired Starts up a periodic timer for checking for expired long poll requests. We run this only once
// a second because we don't really care if we time out at *precisely* 15 seconds.
void long_poll_process_timeouts() {
std::lock_guard lock{long_poll_mutex};
if (long_pollers.empty())
return;
if (long_poll_timeout_body.empty())
{
GET_TRANSACTION_POOL_HASHES_BIN::response res{};
res.status = STATUS_TX_LONG_POLL_TIMED_OUT;
epee::serialization::store_t_to_binary(res, long_poll_timeout_body);
}
int count = 0;
auto now = std::chrono::steady_clock::now();
for (auto it = long_pollers.begin(); it != long_pollers.end(); )
{
if (it->second < now)
{
MTRACE("Sending long poll timeout to " << it->first->request.context.remote);
queue_response(std::move(it->first), long_poll_timeout_body);
it = long_pollers.erase(it);
count++;
}
else
++it;
}
if (count > 0)
MDEBUG("Timed out " << count << " long poll connections");
else
MTRACE("None of " << long_pollers.size() << " established long poll connections reached timeout");
}
std::string get_remote_address(HttpResponse& res) {
std::ostringstream result;
bool first = true;
auto addr = res.getRemoteAddress();
if (addr.size() == 4)
{ // IPv4, packed into bytes
for (auto c : addr) {
if (first) first = false;
else result << '.';
result << +static_cast<uint8_t>(c);
}
}
else if (addr.size() == 16)
{
// IPv6, packed into bytes. Interpret as a series of 8 big-endian shorts and convert to hex,
// joined with :. But we also want to drop leading insignificant 0's (i.e. '34f' instead of
// '034f'), and we want to collapse the longest sequence of 0's that we come across (so that,
// for example, localhost becomes `::1` instead of `0:0:0:0:0:0:0:1`).
std::array<uint16_t, 8> a;
std::memcpy(a.data(), addr.data(), 16);
for (auto& x : a) boost::endian::big_to_native_inplace(x);
size_t zero_start = 0, zero_end = 0;
for (size_t i = 0, start = 0, end = 0; i < a.size(); i++) {
if (a[i] != 0)
continue;
if (end != i) // This zero value starts a new zero sequence
start = i;
end = i + 1;
if (end - start > zero_end - zero_start)
{
zero_end = end;
zero_start = start;
}
}
result << '[' << std::hex;
for (size_t i = 0; i < a.size(); i++)
{
if (i >= zero_start && i < zero_end)
{
if (i == zero_start) result << "::";
continue;
}
if (i > 0 && i != zero_end)
result << ':';
result << a[i];
}
result << ']';
}
else
result << "{unknown:" << lokimq::to_hex(addr) << "}";
return result.str();
}
} // anonymous namespace
void http_server::handle_base_request(
HttpResponse& res,
HttpRequest& req,
const rpc_command& call)
{
std::shared_ptr<call_data> data{new call_data{*this, m_server, res, std::string{req.getUrl()}, &call}};
auto& request = data->request;
request.context.admin = !m_restricted;
request.context.source = rpc_source::http;
request.context.remote = remote;
request.context.remote = get_remote_address(res);
if (uri == "json_rpc")
return handle_json_rpc_request(query_info, response_info, context, request);
res.onAborted([data] { data->aborted = true; });
res.onData([buffer=""s, data=std::move(data)](std::string_view d, bool done) mutable {
if (!done) {
buffer += d;
return;
}
auto it = rpc_commands.find(uri);
if (it == rpc_commands.end())
return HTTP_NOT_FOUND;
if (buffer.empty())
data->request.body = d; // bypass copying the string_view to a string
else
data->request.body = (buffer += d);
auto& cmd = *it->second;
if (m_restricted && !cmd.is_public)
return HTTP_FORBIDDEN;
request.body = std::move(query_info.m_body);
response_info.m_mime_type = cmd.is_binary ? "application/octet-stream" : "application/json";
response_info.m_header_info.m_content_type = response_info.m_mime_type;
try {
response_info.m_body = cmd.invoke(std::move(request), m_server);
return HTTP_OK;
} catch (const parse_error& e) {
// This isn't really WARNable as it's the client fault; log at info level instead.
MINFO("RPC request for '/" << uri << "' called with invalid/unparseable data: " << e.what());
return HTTP_BAD_REQUEST;
} catch (const rpc_error& e) {
MWARNING("RPC request for '/" << uri << "' failed with: " << e.what() << "; returning 500 error");
return HTTP_ERROR;
} catch (const std::exception& e) {
MWARNING("RPC request '/" << uri << "' request raised an exception: " << e.what());
return HTTP_ERROR;
} catch (...) {
MWARNING("RPC request '/" << uri << "' request raised an unknown exception");
return HTTP_ERROR;
}
auto& lmq = data->core_rpc.get_core().get_lmq();
std::string cat{data->call->is_public ? "rpc" : "admin"};
std::string cmd{"http:" + data->uri}; // Used for LMQ job logging; prefixed with http: so we can distinguish it
std::string remote{data->request.context.remote};
lmq.inject_task(std::move(cat), std::move(cmd), std::move(remote), [data=std::move(data)] { invoke_rpc(std::move(data)); });
});
}
http_response_code http_server::handle_json_rpc_request(
const epee::net_utils::http::http_request_info& query_info,
epee::net_utils::http::http_response_info& response_info,
connection_context& context,
rpc_request& request)
void http_server::handle_json_rpc_request(HttpResponse& res, HttpRequest& req)
{
auto& body = response_info.m_body;
std::shared_ptr<call_data> data{new call_data{*this, m_server, res, std::string{req.getUrl()}}};
data->jsonrpc = true;
auto& request = data->request;
request.context.admin = !m_restricted;
request.context.source = rpc_source::http;
request.context.remote = get_remote_address(res);
request.body = jsonrpc_params{};
auto& epee_stuff = std::get<jsonrpc_params>(request.body);
auto& ps = epee_stuff.first;
if(!ps.load_from_json(query_info.m_body))
return json_rpc_error(-32700, "Parse error", body);
res.onAborted([data] { data->aborted = true; });
res.onData([buffer=""s, data, restricted=m_restricted](std::string_view d, bool done) mutable {
if (!done) {
buffer += d;
return;
}
epee::serialization::storage_entry id{std::string{}};
ps.get_value("id", id, nullptr);
std::string method;
if(!ps.get_value("method", method, nullptr))
return json_rpc_error(-32600, "Invalid Request", body);
std::string_view body;
if (buffer.empty())
body = d; // bypass copying the string_view to a string
else
body = (buffer += d);
auto it = rpc_commands.find(method);
if (it == rpc_commands.end() || it->second->is_binary)
return json_rpc_error(-32601, "Method not found", body);
auto& epee_stuff = std::get<jsonrpc_params>(data->request.body = jsonrpc_params{});
auto& [ps, st_entry] = epee_stuff;
if(!ps.load_from_json(body))
return data->http.jsonrpc_error_response(data->res, -32700, "Parse error");
const auto& command = *it->second;
if (m_restricted && !command.is_public)
return json_rpc_error(403, "Forbidden; this command is not available over public RPC", body);
epee::serialization::storage_entry id{std::string{}};
ps.get_value("id", id, nullptr);
std::string id_str;
{
std::ostringstream o;
epee::serialization::dump_as_json(o, id, 0 /*indent*/, false /*newlines*/);
id_str = o.str();
}
std::string method;
if(!ps.get_value("method", method, nullptr))
{
MINFO("Invalid JSON RPC request from " << data->request.context.remote << ": no 'method' in request");
return data->http.jsonrpc_error_response(data->res, -32600, "Invalid Request", id);
}
// Try to load "params" into a generic epee value; if it fails (because there is no "params")
// then we will replace it with an empty string to signal that no params were provided.
if (!ps.get_value("params", epee_stuff.second, nullptr))
request.body = ""sv;
auto it = rpc_commands.find(method);
if (it == rpc_commands.end() || it->second->is_binary)
{
MINFO("Invalid JSON RPC request from " << data->request.context.remote << ": method '" << method << "' is invalid");
return data->http.jsonrpc_error_response(data->res, -32601, "Method not found", id);
}
std::string result;
try {
result = command.invoke(std::move(request), m_server);
} catch (const parse_error& e) {
// This isn't really WARNable as it's the client fault; log at info level instead.
MINFO("JSON RPC request for '" << method << "' called with invalid data: " << e.what());
return json_rpc_error(-32602, "Invalid params", body);
} catch (const rpc_error& e) {
MWARNING("JSON RPC request for '" << method << "' failed with: " << e.what());
return json_rpc_error(e.code, e.message, body);
} catch (const std::exception& e) {
MWARNING("json_rpc '" << method << "' request raised an exception: " << e.what());
return json_rpc_error(-32603, "Internal error", body);
} catch (...) {
MWARNING("json_rpc '" << method << "' request raised an unknown exception");
return json_rpc_error(-32603, "Internal error", body);
}
data->call = it->second.get();
if (restricted && !data->call->is_public)
{
MWARNING("Invalid JSON RPC request from " << data->request.context.remote << ": method '" << method << "' is restricted");
return data->http.jsonrpc_error_response(data->res, 403, "Forbidden; this command is not available over public RPC", id);
}
assert(body.empty());
response_info.m_body_pieces.emplace_back(R"({"jsonrpc":"2.0","id":)");
response_info.m_body_pieces.back() += id_str;
response_info.m_body_pieces.back() += R"(,"result":)";
MDEBUG("Incoming JSON RPC request for " << method << " from " << data->request.context.remote);
response_info.m_body_pieces.push_back(std::move(result));
{
std::ostringstream o;
epee::serialization::dump_as_json(o, id, 0 /*indent*/, false /*newlines*/);
data->jsonrpc_id = o.str();
}
response_info.m_body_pieces.push_back("}\n");
return HTTP_OK;
// Try to load "params" into a generic epee value; if it fails (because there is no "params")
// then we replace request.body with an empty string (instead of the epee jsonrpc_params
// alternative) to signal that no params were provided at all.
if (!ps.get_value("params", epee_stuff.second, nullptr))
data->request.body = ""sv;
auto& lmq = data->core_rpc.get_core().get_lmq();
std::string cat{data->call->is_public ? "rpc" : "admin"};
std::string cmd{"jsonrpc:" + method}; // Used for LMQ job logging; prefixed with jsonrpc: so we can distinguish it
std::string remote{data->request.context.remote};
lmq.inject_task(std::move(cat), std::move(cmd), std::move(remote), [data=std::move(data)] { invoke_rpc(std::move(data)); });
});
}
}} // namespace cryptonote::rpc
static std::unordered_set<lokimq::LokiMQ*> timer_started;
void http_server::start()
{
if (m_sent_startup)
throw std::logic_error{"Cannot call http_server::start() more than once"};
auto net = m_server.nettype();
m_server_header = "lokid/"s + (m_restricted ? std::to_string(LOKI_VERSION[0]) : LOKI_VERSION_FULL)
+ (net == MAINNET ? " mainnet" : net == TESTNET ? " testnet" : net == STAGENET ? " stagenet" : net == FAKECHAIN ? " fakenet" : " unknown net");
m_startup_promise.set_value(true);
m_sent_startup = true;
m_listen_socks = m_startup_success.get();
auto& lmq = m_server.get_core().get_lmq();
if (timer_started.insert(&lmq).second)
lmq.add_timer(long_poll_process_timeouts, 1s);
}
void http_server::shutdown(bool join)
{
if (!m_rpc_thread.joinable())
return;
if (!m_sent_shutdown)
{
MTRACE("initiating shutdown");
if (!m_sent_startup)
{
m_startup_promise.set_value(false);
m_sent_startup = true;
}
else if (!m_listen_socks.empty())
{
loop_defer([this] {
MTRACE("closing " << m_listen_socks.size() << " listening sockets");
for (auto* s : m_listen_socks)
us_listen_socket_close(/*ssl=*/false, s);
{
// Destroy any pending long poll connections as well
MTRACE("closing pending long poll requests");
std::lock_guard lock{long_poll_mutex};
for (auto it = long_pollers.begin(); it != long_pollers.end(); )
{
if (&it->first->http != this)
continue; // Belongs to some other http_server instance
it->first->aborted = true;
it->first->res.close();
it = long_pollers.erase(it);
}
}
});
}
m_sent_shutdown = true;
}
MTRACE("joining rpc thread");
if (join)
m_rpc_thread.join();
MTRACE("done shutdown");
}
http_server::~http_server()
{
shutdown(true);
}
} // namespace cryptonote::rpc

View File

@ -31,66 +31,120 @@
#pragma once
#include "net/http_server_impl_base.h"
#include <uWebSockets/App.h>
#include "common/command_line.h"
#include "common/password.h"
#include "core_rpc_server.h"
namespace cryptonote { namespace rpc {
namespace cryptonote::rpc {
using HttpRequest = uWS::HttpRequest;
using HttpResponse = uWS::HttpResponse<false/*SSL*/>;
using http_response_code = std::pair<int, std::string_view>;
/************************************************************************/
/* Core HTTP RPC server */
/************************************************************************/
class http_server: public epee::http_server_impl_base<http_server>
class http_server
{
public:
static constexpr int DEFAULT_RPC_THREADS = 2;
static const command_line::arg_descriptor<std::string, false, true, 2> arg_rpc_bind_port;
static const command_line::arg_descriptor<std::string> arg_rpc_restricted_bind_port;
static const command_line::arg_descriptor<uint16_t, false, true, 2> arg_rpc_bind_port;
static const command_line::arg_descriptor<uint16_t> arg_rpc_restricted_bind_port;
static const command_line::arg_descriptor<bool> arg_restricted_rpc;
static const command_line::arg_descriptor<std::string> arg_rpc_ssl;
static const command_line::arg_descriptor<std::string> arg_rpc_ssl_private_key;
static const command_line::arg_descriptor<std::string> arg_rpc_ssl_certificate;
static const command_line::arg_descriptor<std::string> arg_rpc_ssl_ca_certificates;
static const command_line::arg_descriptor<std::vector<std::string>> arg_rpc_ssl_allowed_fingerprints;
static const command_line::arg_descriptor<bool> arg_rpc_ssl_allow_any_cert;
static const command_line::arg_descriptor<bool> arg_public_node;
static const command_line::arg_descriptor<int> arg_rpc_long_poll_connections;
typedef epee::net_utils::connection_context_base connection_context;
http_server(core_rpc_server& server) : m_server{server} {}
static void init_options(boost::program_options::options_description& desc);
bool init(
http_server(
core_rpc_server& server,
const boost::program_options::variables_map& vm,
const bool restricted,
const std::string& port
);
uint16_t port
);
bool handle_http_request(
const epee::net_utils::http::http_request_info& query_info,
epee::net_utils::http::http_response_info& response,
connection_context& context) override;
~http_server();
http_response_code handle_http(
const epee::net_utils::http::http_request_info& query_info,
epee::net_utils::http::http_response_info& response_info,
connection_context& context);
/// Starts the event loop in the thread handling http requests. Core must have been initialized
/// and LokiMQ started. Will propagate an exception from the thread if startup fails.
void start();
http_response_code handle_json_rpc_request(
const epee::net_utils::http::http_request_info& query_info,
epee::net_utils::http::http_response_info& response_info,
connection_context& context,
rpc_request& request);
/// Closes the http server connection. Can safely be called multiple times, or to abort a
/// startup if called before start().
///
/// \param join - if true, wait for the proxy thread to exit. If false then joining will occur
/// during destruction.
void shutdown(bool join = false);
/// Checks for required authentication, if enabled. If authentication fails, sets a "failed"
/// response and returns false; if authentication isn't required or passes, returns true (and
/// doesn't touch the response).
bool check_auth(HttpRequest& req, HttpResponse& res);
/// Handles a request for a base url, e.g. /foo (but not /json_rpc). `call` is the callback
/// we've already mapped the request to; restricted commands have also already been rejected
/// (unless the RPC is unrestricted).
void handle_base_request(
HttpResponse& res,
HttpRequest& req,
const rpc_command& call);
/// Handles a POST request to /json_rpc.
void handle_json_rpc_request(HttpResponse& res, HttpRequest& req);
// Posts a callback to the uWebSockets thread loop controlling this connection; all writes must
// be done from that thread, and so this method is provided to defer a callback from another
// thread into that one. The function should have signature `void ()`.
template <typename Func>
void loop_defer(Func&& f) {
m_loop->defer(std::forward<Func>(f));
}
// Sends an error response and finalizes the response. If body is empty, uses the default error
// response text.
void error_response(
HttpResponse& res,
http_response_code code,
std::optional<std::string_view> body = std::nullopt) const;
// Similar to the above, but for JSON RPC requests: we send "200 OK" at the HTTP layer; the
// error code and message gets encoded in JSON inside the response body.
void jsonrpc_error_response(
HttpResponse& res,
int code,
std::string message,
std::optional<epee::serialization::storage_entry> = std::nullopt) const;
const std::string& server_header() { return m_server_header; }
int m_max_long_poll_connections;
private:
void create_rpc_endpoints(uWS::App& http);
// The core rpc server which handles the internal requests
core_rpc_server& m_server;
// The uWebSockets event loop pointer (so that we can inject a callback to shut it down)
uWS::Loop* m_loop{nullptr};
// The socket(s) we are listening on
std::vector<us_listen_socket_t*> m_listen_socks;
// The thread in which the uWebSockets event listener is running
std::thread m_rpc_thread;
// A promise we send from outside into the event loop thread to signal it to start. We sent
// "true" to go ahead with binding + starting the event loop, or false to abort.
std::promise<bool> m_startup_promise;
// A future (promise held by the thread) that delivers us the listening uSockets sockets so
// that, when we want to shut down, we can tell uWebSockets to close them (which will then run
// off the end of the event loop). This also doubles to propagate listen exceptions back to us.
std::future<std::vector<us_listen_socket_t*>> m_startup_success;
// Whether we have sent the startup/shutdown signals
bool m_sent_startup{false}, m_sent_shutdown{false};
// An optional required login for this HTTP RPC interface
std::optional<tools::login> m_login;
// Whether this is restricted, i.e. public. Unrestricted allows admin commands.
bool m_restricted;
std::atomic<int> m_long_poll_active_connections;
// Cached string we send for the Server header; full version if unrestricted, just the major
// version if restricted
std::string m_server_header;
};
}} // namespace cryptonote::rpc
} // namespace cryptonote::rpc

View File

@ -132,7 +132,7 @@ lmq_rpc::lmq_rpc(cryptonote::core& core, core_rpc_server& rpc, const boost::prog
auth.emplace(std::move(pk), AuthLevel::basic);
// basic (non-admin) rpc commands go into the "rpc." category (e.g. 'rpc.get_info')
lmq.add_category("rpc", AuthLevel::basic);
lmq.add_category("rpc", AuthLevel::basic, 0 /*no reserved threads*/, 1000 /*max queued requests*/);
// Admin rpc commands go into "admin.". We also always keep one (potential) thread reserved for
// admin RPC commands; that way even if there are loads of basic commands being processed we'll

View File

@ -28,64 +28,16 @@
//
#include "rpc_args.h"
#include <boost/algorithm/string.hpp>
#include <boost/version.hpp>
#include <boost/asio/ip/address.hpp>
#include "common/command_line.h"
#include "common/i18n.h"
#include "hex.h"
#include "common/string_util.h"
using namespace std::literals;
namespace cryptonote
{
namespace
{
std::optional<epee::net_utils::ssl_options_t> do_process_ssl(const boost::program_options::variables_map& vm, const rpc_args::descriptors& arg, const bool any_cert_option)
{
bool ssl_required = false;
epee::net_utils::ssl_options_t ssl_options = epee::net_utils::ssl_support_t::e_ssl_support_enabled;
if (any_cert_option && command_line::get_arg(vm, arg.rpc_ssl_allow_any_cert))
ssl_options.verification = epee::net_utils::ssl_verification_t::none;
else
{
std::string ssl_ca_file = command_line::get_arg(vm, arg.rpc_ssl_ca_certificates);
const std::vector<std::string> ssl_allowed_fingerprints = command_line::get_arg(vm, arg.rpc_ssl_allowed_fingerprints);
std::vector<std::vector<uint8_t>> allowed_fingerprints{ ssl_allowed_fingerprints.size() };
std::transform(ssl_allowed_fingerprints.begin(), ssl_allowed_fingerprints.end(), allowed_fingerprints.begin(), epee::from_hex::vector);
for (const auto &fpr: allowed_fingerprints)
{
if (fpr.size() != SSL_FINGERPRINT_SIZE)
{
MERROR("SHA-256 fingerprint should be " BOOST_PP_STRINGIZE(SSL_FINGERPRINT_SIZE) " bytes long.");
return std::nullopt;
}
}
if (!allowed_fingerprints.empty() || !ssl_ca_file.empty())
{
ssl_required = true;
ssl_options = epee::net_utils::ssl_options_t{
std::move(allowed_fingerprints), std::move(ssl_ca_file)
};
if (command_line::get_arg(vm, arg.rpc_ssl_allow_chained))
ssl_options.verification = epee::net_utils::ssl_verification_t::user_ca;
}
}
// user specified CA file or fingeprints implies enabled SSL by default
if (!ssl_required && !epee::net_utils::ssl_support_from_string(ssl_options.support, command_line::get_arg(vm, arg.rpc_ssl)))
{
MERROR("Invalid argument for " << std::string(arg.rpc_ssl.name));
return std::nullopt;
}
ssl_options.auth = epee::net_utils::ssl_authentication_t{
command_line::get_arg(vm, arg.rpc_ssl_private_key), command_line::get_arg(vm, arg.rpc_ssl_certificate)
};
return {std::move(ssl_options)};
}
} // anonymous
rpc_args::descriptors::descriptors()
: rpc_bind_ip({"rpc-bind-ip", rpc_args::tr("Specify IP to bind RPC server"), "127.0.0.1"})
@ -95,13 +47,6 @@ namespace cryptonote
, rpc_login({"rpc-login", rpc_args::tr("Specify username[:password] required for RPC server"), "", true})
, confirm_external_bind({"confirm-external-bind", rpc_args::tr("Confirm rpc-bind-ip value is NOT a loopback (local) IP")})
, rpc_access_control_origins({"rpc-access-control-origins", rpc_args::tr("Specify a comma separated list of origins to allow cross origin resource sharing"), ""})
, rpc_ssl({"rpc-ssl", rpc_args::tr("Enable SSL on RPC connections: enabled|disabled|autodetect"), "autodetect"})
, rpc_ssl_private_key({"rpc-ssl-private-key", rpc_args::tr("Path to a PEM format private key"), ""})
, rpc_ssl_certificate({"rpc-ssl-certificate", rpc_args::tr("Path to a PEM format certificate"), ""})
, rpc_ssl_ca_certificates({"rpc-ssl-ca-certificates", rpc_args::tr("Path to file containing concatenated PEM format certificate(s) to replace system CA(s)."), ""})
, rpc_ssl_allowed_fingerprints({"rpc-ssl-allowed-fingerprints", rpc_args::tr("List of certificate fingerprints to allow")})
, rpc_ssl_allow_chained({"rpc-ssl-allow-chained", rpc_args::tr("Allow user (via --rpc-ssl-certificates) chain certificates"), false})
, rpc_ssl_allow_any_cert({"rpc-ssl-allow-any-cert", rpc_args::tr("Allow any peer certificate"), false})
, rpc_public_node({"public-node", rpc_args::tr("Allow other users to use the node as a remote (restricted RPC mode, view-only commands) and advertise it over P2P"), false})
, zmq_rpc_bind_ip({"zmq-rpc-bind-ip", rpc_args::tr("Deprecated option, ignored."), ""})
, zmq_rpc_bind_port({"zmq-rpc-bind-port", rpc_args::tr("Deprecated option, ignored."), ""})
@ -109,7 +54,7 @@ namespace cryptonote
const char* rpc_args::tr(const char* str) { return i18n_translate(str, "cryptonote::rpc_args"); }
void rpc_args::init_options(boost::program_options::options_description& desc, const bool any_cert_option)
void rpc_args::init_options(boost::program_options::options_description& desc)
{
const descriptors arg{};
command_line::add_arg(desc, arg.rpc_bind_ip);
@ -119,17 +64,27 @@ namespace cryptonote
command_line::add_arg(desc, arg.rpc_login);
command_line::add_arg(desc, arg.confirm_external_bind);
command_line::add_arg(desc, arg.rpc_access_control_origins);
command_line::add_arg(desc, arg.rpc_ssl);
command_line::add_arg(desc, arg.rpc_ssl_private_key);
command_line::add_arg(desc, arg.rpc_ssl_certificate);
command_line::add_arg(desc, arg.rpc_ssl_ca_certificates);
command_line::add_arg(desc, arg.rpc_ssl_allowed_fingerprints);
command_line::add_arg(desc, arg.rpc_ssl_allow_chained);
if (any_cert_option)
command_line::add_arg(desc, arg.rpc_ssl_allow_any_cert);
}
std::optional<rpc_args> rpc_args::process(const boost::program_options::variables_map& vm, const bool any_cert_option)
// Checks an IP address for validity; throws on problem.
static void check_ip(const std::string& ip, bool allow_external, const std::string& option_name) {
boost::system::error_code ec{};
const auto parsed_ip =
#if BOOST_VERSION >= 106600
boost::asio::ip::make_address(ip, ec);
#else
boost::asio::ip::address::from_string(ip, ec);
#endif
if (ec)
throw std::runtime_error{tr("Invalid IP address given for --") + option_name};
if (!parsed_ip.is_loopback() && !allow_external)
throw std::runtime_error{
"--" + option_name +
tr(" permits inbound unencrypted external connections. Consider SSH tunnel or SSL proxy instead. Override with --confirm-external-bind")};
}
rpc_args rpc_args::process(const boost::program_options::variables_map& vm)
{
const descriptors arg{};
rpc_args config{};
@ -139,102 +94,38 @@ namespace cryptonote
config.use_ipv6 = command_line::get_arg(vm, arg.rpc_use_ipv6);
config.require_ipv4 = !command_line::get_arg(vm, arg.rpc_ignore_ipv4);
if (!config.bind_ip.empty())
{
// always parse IP here for error consistency
boost::system::error_code ec{};
const auto parsed_ip = boost::asio::ip::address::from_string(config.bind_ip, ec);
if (ec)
{
LOG_ERROR(tr("Invalid IP address given for --") << arg.rpc_bind_ip.name);
return std::nullopt;
}
check_ip(config.bind_ip, command_line::get_arg(vm, arg.confirm_external_bind), arg.rpc_bind_ip.name);
if (!parsed_ip.is_loopback() && !command_line::get_arg(vm, arg.confirm_external_bind))
{
LOG_ERROR(
"--" << arg.rpc_bind_ip.name <<
tr(" permits inbound unencrypted external connections. Consider SSH tunnel or SSL proxy instead. Override with --") <<
arg.confirm_external_bind.name
);
return std::nullopt;
}
}
if (!config.bind_ipv6_address.empty())
{
// allow square braces, but remove them here if present
if (config.bind_ipv6_address.find('[') != std::string::npos)
{
if (config.bind_ipv6_address.find('[') != std::string::npos && config.bind_ipv6_address.rfind(']') != std::string::npos)
config.bind_ipv6_address = config.bind_ipv6_address.substr(1, config.bind_ipv6_address.size() - 2);
}
// always parse IP here for error consistency
boost::system::error_code ec{};
const auto parsed_ip = boost::asio::ip::address::from_string(config.bind_ipv6_address, ec);
if (ec)
{
LOG_ERROR(tr("Invalid IP address given for --") << arg.rpc_bind_ipv6_address.name);
return std::nullopt;
}
if (!parsed_ip.is_loopback() && !command_line::get_arg(vm, arg.confirm_external_bind))
{
LOG_ERROR(
"--" << arg.rpc_bind_ipv6_address.name <<
tr(" permits inbound unencrypted external connections. Consider SSH tunnel or SSL proxy instead. Override with --") <<
arg.confirm_external_bind.name
);
return std::nullopt;
}
check_ip(config.bind_ipv6_address, command_line::get_arg(vm, arg.confirm_external_bind), arg.rpc_bind_ipv6_address.name);
}
const char *env_rpc_login = nullptr;
const bool has_rpc_arg = command_line::has_arg(vm, arg.rpc_login);
const bool use_rpc_env = !has_rpc_arg && (env_rpc_login = getenv("RPC_LOGIN")) != nullptr && strlen(env_rpc_login) > 0;
std::optional<tools::login> login{};
if (has_rpc_arg || use_rpc_env)
{
config.login = tools::login::parse(
has_rpc_arg ? command_line::get_arg(vm, arg.rpc_login) : std::string(env_rpc_login), true, [](bool verify) {
return tools::password_container::prompt(verify, "RPC server password");
});
auto verify = [](bool verify) { return tools::password_container::prompt(verify, "RPC server password"); };
if (command_line::has_arg(vm, arg.rpc_login))
config.login = tools::login::parse(command_line::get_arg(vm, arg.rpc_login), true, verify);
else if (const char *env_rpc_login = std::getenv("RPC_LOGIN"); env_rpc_login != nullptr && std::strlen(env_rpc_login))
config.login = tools::login::parse(env_rpc_login, true, verify);
if (!config.login)
return std::nullopt;
if (config.login->username.empty())
{
LOG_ERROR(tr("Username specified with --") << arg.rpc_login.name << tr(" cannot be empty"));
return std::nullopt;
}
}
if (config.login && config.login->username.empty())
throw std::runtime_error{tr("Username specified with --") + std::string{arg.rpc_login.name} + " cannot be empty"};
auto access_control_origins_input = command_line::get_arg(vm, arg.rpc_access_control_origins);
if (!access_control_origins_input.empty())
{
// FIXME: this requirement makes no sense.
if (!config.login)
{
LOG_ERROR(arg.rpc_access_control_origins.name << tr(" requires RPC server password --") << arg.rpc_login.name << tr(" cannot be empty"));
return std::nullopt;
}
throw std::runtime_error{"--"s + arg.rpc_access_control_origins.name + tr(" requires RPC server password --") + arg.rpc_login.name + tr(" cannot be empty")};
auto aco_entries = tools::split_any(access_control_origins_input, ", \t", true);
std::vector<std::string> access_control_origins;
boost::split(access_control_origins, access_control_origins_input, boost::is_any_of(","));
std::for_each(access_control_origins.begin(), access_control_origins.end(), [](auto& a) { boost::trim(a, std::locale::classic()); });
config.access_control_origins = std::move(access_control_origins);
access_control_origins.reserve(aco_entries.size());
for (auto& aco : aco_entries) access_control_origins.emplace_back(aco);
}
auto ssl_options = do_process_ssl(vm, arg, any_cert_option);
if (!ssl_options)
return std::nullopt;
config.ssl_options = std::move(*ssl_options);
return {std::move(config)};
}
std::optional<epee::net_utils::ssl_options_t> rpc_args::process_ssl(const boost::program_options::variables_map& vm, const bool any_cert_option)
{
const descriptors arg{};
return do_process_ssl(vm, arg, any_cert_option);
return config;
}
}

View File

@ -35,7 +35,6 @@
#include "common/command_line.h"
#include "common/password.h"
#include "net/net_ssl.h"
namespace cryptonote
{
@ -58,28 +57,16 @@ namespace cryptonote
const command_line::arg_descriptor<std::string> rpc_login;
const command_line::arg_descriptor<bool> confirm_external_bind;
const command_line::arg_descriptor<std::string> rpc_access_control_origins;
const command_line::arg_descriptor<std::string> rpc_ssl;
const command_line::arg_descriptor<std::string> rpc_ssl_private_key;
const command_line::arg_descriptor<std::string> rpc_ssl_certificate;
const command_line::arg_descriptor<std::string> rpc_ssl_ca_certificates;
const command_line::arg_descriptor<std::vector<std::string>> rpc_ssl_allowed_fingerprints;
const command_line::arg_descriptor<bool> rpc_ssl_allow_chained;
const command_line::arg_descriptor<bool> rpc_ssl_allow_any_cert;
const command_line::arg_descriptor<bool> rpc_public_node;
const command_line::arg_descriptor<std::string> zmq_rpc_bind_ip; // Deprecated & ignored
const command_line::arg_descriptor<std::string> zmq_rpc_bind_port; // Deprecated & ignored
};
// `allow_any_cert` bool toggles `--rpc-ssl-allow-any-cert` configuration
static const char* tr(const char* str);
static void init_options(boost::program_options::options_description& desc, const bool any_cert_option = false);
static void init_options(boost::program_options::options_description& desc);
//! \return Arguments specified by user, or `std::nullopt` if error
static std::optional<rpc_args> process(const boost::program_options::variables_map& vm, const bool any_cert_option = false);
//! \return SSL arguments specified by user, or `std::nullopt` if error
static std::optional<epee::net_utils::ssl_options_t> process_ssl(const boost::program_options::variables_map& vm, const bool any_cert_option = false);
//! \return Arguments specified by user. Throws on error.
static rpc_args process(const boost::program_options::variables_map& vm);
std::string bind_ip;
std::string bind_ipv6_address;
@ -87,6 +74,5 @@ namespace cryptonote
bool require_ipv4;
std::vector<std::string> access_control_origins;
std::optional<tools::login> login; // currently `std::nullopt` if unspecified by user
epee::net_utils::ssl_options_t ssl_options = epee::net_utils::ssl_support_t::e_ssl_support_enabled;
};
}

View File

@ -46,7 +46,6 @@
#include "wallet2.h"
#include "cryptonote_basic/cryptonote_format_utils.h"
#include "rpc/core_rpc_server_commands_defs.h"
#include "rpc/core_rpc_server.h"
#include "misc_language.h"
#include "cryptonote_basic/cryptonote_basic_impl.h"
#include "cryptonote_basic/hardfork.h"
@ -2990,7 +2989,12 @@ bool wallet2::long_poll_pool_state()
// Update daemon address for long polling here instead of in set_daemon which
// could block on the long polling connection thread.
static bool local_address = true;
std::chrono::milliseconds local_timeout = 500ms;
// How long we wait before retrying the connection if we get an error
constexpr auto local_retry = 500ms;
constexpr auto remote_retry = 3s;
// How long we wait for a long poll response before timing out; we add a 5s buffer to the usual
// timeout to allow for network latency and lokid response time.
constexpr auto long_poll_timeout = cryptonote::rpc::GET_TRANSACTION_POOL_HASHES_BIN::long_poll_timeout + 5s;
{
std::string new_host;
std::optional<epee::net_utils::http::login> login;
@ -3020,20 +3024,22 @@ bool wallet2::long_poll_pool_state()
req.tx_pool_checksum = get_long_poll_tx_pool_checksum();
bool r = false;
{
constexpr auto timeout = cryptonote::rpc::GET_TRANSACTION_POOL_HASHES_BIN::long_poll_timeout + 5s;
std::lock_guard<decltype(m_long_poll_mutex)> lock(m_long_poll_mutex);
r = epee::net_utils::invoke_http_bin("/" + std::string{rpc::GET_TRANSACTION_POOL_HASHES_BIN::names().front()},
req,
res,
m_long_poll_client,
local_address ? local_timeout : cryptonote::rpc::long_poll_timeout,
timeout,
"GET");
}
bool maxed_out_connections = res.status == rpc::STATUS_TX_LONG_POLL_MAX_CONNECTIONS;
bool timed_out = res.status == rpc::STATUS_TX_LONG_POLL_TIMED_OUT;
if (maxed_out_connections || timed_out)
if (!r || maxed_out_connections || timed_out)
{
if (maxed_out_connections) std::this_thread::sleep_for(local_address ? local_timeout : cryptonote::rpc::long_poll_timeout);
MINFO("Long poll " << (!r ? "request error" : res.status == rpc::STATUS_TX_LONG_POLL_MAX_CONNECTIONS ? "replied with max connections" : "replied with no pool change"));
if (!r || maxed_out_connections) std::this_thread::sleep_for(local_address ? local_retry : remote_retry);
return false;
}

View File

@ -181,9 +181,13 @@ namespace tools
//------------------------------------------------------------------------------------------------------------------------------
bool wallet_rpc_server::init()
{
auto rpc_config = cryptonote::rpc_args::process(m_vm);
if (!rpc_config)
cryptonote::rpc_args rpc_config;
try {
rpc_config = cryptonote::rpc_args::process(m_vm);
} catch (const std::exception& e) {
MERROR("Failed to process rpc arguments: " << e.what());
return false;
}
std::optional<epee::net_utils::http::login> http_login{};
std::string bind_port = command_line::get_arg(m_vm, arg_rpc_bind_port);
@ -215,7 +219,7 @@ namespace tools
if (disable_auth)
{
if (rpc_config->login)
if (rpc_config.login)
{
const cryptonote::rpc_args::descriptors arg{};
LOG_ERROR(tr("Cannot specify --") << arg_disable_rpc_login.name << tr(" and --") << arg.rpc_login.name);
@ -224,7 +228,7 @@ namespace tools
}
else // auth enabled
{
if (!rpc_config->login)
if (!rpc_config.login)
{
std::array<std::uint8_t, 16> rand_128bit{{}};
crypto::rand(rand_128bit.size(), rand_128bit.data());
@ -255,7 +259,7 @@ namespace tools
else // chosen user/pass
{
http_login.emplace(
std::move(rpc_config->login->username), std::move(rpc_config->login->password).password()
std::move(rpc_config.login->username), std::move(rpc_config.login->password).password()
);
}
assert(bool(http_login));
@ -267,10 +271,9 @@ namespace tools
m_net_server.set_threads_prefix("RPC");
auto rng = [](size_t len, uint8_t *ptr) { return crypto::rand(len, ptr); };
return epee::http_server_impl_base<wallet_rpc_server, connection_context>::init(
rng, std::move(bind_port), std::move(rpc_config->bind_ip),
std::move(rpc_config->bind_ipv6_address), std::move(rpc_config->use_ipv6), std::move(rpc_config->require_ipv4),
std::move(rpc_config->access_control_origins), std::move(http_login),
std::move(rpc_config->ssl_options)
rng, std::move(bind_port), std::move(rpc_config.bind_ip),
std::move(rpc_config.bind_ipv6_address), std::move(rpc_config.use_ipv6), std::move(rpc_config.require_ipv4),
std::move(rpc_config.access_control_origins), std::move(http_login)
);
}
//------------------------------------------------------------------------------------------------------------------------------

View File

@ -917,6 +917,7 @@ inline bool do_replay_events_get_core(std::vector<test_event_entry>& events, cry
{
boost::program_options::options_description desc("Allowed options");
cryptonote::core::init_options(desc);
cryptonote::long_poll_trigger = [](cryptonote::tx_memory_pool&) {};
boost::program_options::variables_map vm;
bool r = command_line::handle_error_helper(desc, [&]()
{