Make startup more synchronous; oxenmq update

This does better synchronization at startup:

- OMQ listening ports now aren't opened until we have processed a block
update from oxend.  (Fixes #358)
- HTTPS is similar delayed
- port status messages are now shown when opening, instead of from
main().

This updates oxen-mq to latest dev (needed to support after-start()
listening), which also brings a potential connection mismatch fix.
This commit is contained in:
Jason Rhinelander 2021-06-23 17:53:11 -03:00
parent fb1df09486
commit fdd83f954f
8 changed files with 111 additions and 41 deletions

View file

@ -133,15 +133,23 @@ HTTPSServer::HTTPSServer(
bool required_bind_failed = false;
for (const auto& [addr, port, required] : bind)
https.listen(addr, port, LIBUS_LISTEN_EXCLUSIVE_PORT,
[&listening, req=required, &required_bind_failed](us_listen_socket_t* sock) {
if (sock) listening.push_back(sock);
else if (req) required_bind_failed = true;
[&listening, req=required, &required_bind_failed, addr=fmt::format("{}:{}", addr, port)]
(us_listen_socket_t* sock) {
if (sock) {
OXEN_LOG(info, "HTTPS server listening at {}", addr);
listening.push_back(sock);
} else if (req) {
required_bind_failed = true;
OXEN_LOG(critical, "HTTPS server failed to bind to required address {}", addr);
} else {
OXEN_LOG(warn, "HTTPS server failed to bind to (non-required) address {}", addr);
}
});
if (listening.empty() || required_bind_failed) {
std::ostringstream error;
error << "RPC HTTP server failed to bind; ";
if (listening.empty()) error << "no valid bind address(es) given";
if (listening.empty()) error << "no valid bind address(es) given; ";
error << "tried to bind to:";
for (const auto& [addr, port, required] : bind)
error << ' ' << addr << ':' << port;

View file

@ -115,10 +115,6 @@ int main(int argc, char* argv[]) {
OXEN_LOG(info, "Setting log level to {}", options.log_level);
OXEN_LOG(info, "Setting database location to {}", data_dir);
OXEN_LOG(info, "Connecting to oxend @ {}", options.oxend_omq_rpc);
OXEN_LOG(info, "HTTPS server is listening at {}:{}", options.ip,
options.port);
OXEN_LOG(info, "OxenMQ is listening at {}:{}", options.ip,
options.omq_port);
if (sodium_init() != 0) {
OXEN_LOG(err, "Could not initialize libsodium");

View file

@ -10,12 +10,16 @@
#include "rate_limiter.h"
#include "request_handler.h"
#include "service_node.h"
#include "string_utils.hpp"
#include <chrono>
#include <exception>
#include <nlohmann/json.hpp>
#include <oxenmq/bt_serialize.h>
#include <oxenmq/hex.h>
#include <optional>
#include <stdexcept>
#include <variant>
namespace oxen {
@ -391,15 +395,6 @@ OxenmqServer::OxenmqServer(
for (const auto& key : stats_access_keys)
stats_access_keys_.emplace(key.view());
OXEN_LOG(info, "OxenMQ is listenting on port {}", me.omq_port);
omq_.listen_curve(
fmt::format("tcp://0.0.0.0:{}", me.omq_port),
[this](std::string_view /*addr*/, std::string_view pk, bool /*sn*/) {
return stats_access_keys_.count(std::string{pk})
? oxenmq::AuthLevel::admin : oxenmq::AuthLevel::none;
});
// clang-format off
// Endpoints invoked by other SNs
@ -447,28 +442,74 @@ OxenmqServer::OxenmqServer(
void OxenmqServer::connect_oxend(const oxenmq::address& oxend_rpc) {
// Establish our persistent connection to oxend.
oxend_conn_ = omq_.connect_remote(oxend_rpc,
[this](auto&&) {
OXEN_LOG(info, "connection to oxend established");
service_node_->on_oxend_connected();
},
[this, oxend_rpc](auto&&, std::string_view reason) {
OXEN_LOG(warn, "failed to connect to local oxend @ {}: {}; retrying", oxend_rpc, reason);
connect_oxend(oxend_rpc);
},
// Turn this off since we are using oxenmq's own key and don't want to replace some existing
// connection to it that might also be using that pubkey:
oxenmq::connect_option::ephemeral_routing_id{},
oxenmq::AuthLevel::admin);
auto start = std::chrono::steady_clock::now();
while (true) {
std::promise<bool> prom;
OXEN_LOG(info, "Establishing connection to oxend...");
omq_.connect_remote(oxend_rpc,
[this, &prom](auto cid) { oxend_conn_ = cid; prom.set_value(true); },
[&prom, &oxend_rpc](auto&&, std::string_view reason) {
OXEN_LOG(warn, "failed to connect to local oxend @ {}: {}; retrying", oxend_rpc, reason);
prom.set_value(false);
},
// Turn this off since we are using oxenmq's own key and don't want to replace some existing
// connection to it that might also be using that pubkey:
oxenmq::connect_option::ephemeral_routing_id{},
oxenmq::AuthLevel::admin);
if (prom.get_future().get()) {
OXEN_LOG(info, "Connected to oxend in {}",
util::short_duration(std::chrono::steady_clock::now() - start));
break;
}
std::this_thread::sleep_for(500ms);
}
}
void OxenmqServer::init(ServiceNode* sn, RequestHandler* rh, RateLimiter* rl, oxenmq::address oxend_rpc) {
// Initialization happens in 3 steps:
// - connect to oxend
// - get initial block update from oxend
// - start OMQ and HTTPS listeners
assert(!service_node_);
service_node_ = sn;
request_handler_ = rh;
rate_limiter_ = rl;
omq_.start();
// Block until we are connected to oxend:
connect_oxend(oxend_rpc);
// Block until we get a block update from oxend:
service_node_->on_oxend_connected();
// start omq listener
const auto& me = service_node_->own_address();
OXEN_LOG(info, "Starting listening for OxenMQ connections on port {}", me.omq_port);
auto omq_prom = std::make_shared<std::promise<void>>();
auto omq_future = omq_prom->get_future();
omq_.listen_curve(
fmt::format("tcp://0.0.0.0:{}", me.omq_port),
[this](std::string_view /*addr*/, std::string_view pk, bool /*sn*/) {
return stats_access_keys_.count(std::string{pk})
? oxenmq::AuthLevel::admin : oxenmq::AuthLevel::none;
},
[prom=std::move(omq_prom)](bool listen_success) {
if (listen_success)
prom->set_value();
else {
try { throw std::runtime_error{""}; }
catch (...) { prom->set_exception(std::current_exception()); }
}
});
try {
omq_future.get();
} catch (const std::runtime_error&) {
auto msg = fmt::format("OxenMQ server failed to bind to port {}", me.omq_port);
OXEN_LOG(critical, msg);
throw std::runtime_error{msg};
}
// The https server startup happens in main(), after we return
}
std::string OxenmqServer::encode_onion_data(std::string_view payload, const OnionRequestMetadata& data) {

View file

@ -87,6 +87,8 @@ class OxenmqServer {
// Access pubkeys for the 'service' command category (for access stats & logs), in binary.
std::unordered_set<std::string> stats_access_keys_;
// Connects (and blocks until connected) to oxend. When this returns an oxend connection will
// be available (and oxend_conn_ will be set to the connection id to reach it).
void connect_oxend(const oxenmq::address& oxend_rpc);
public:
@ -95,7 +97,8 @@ class OxenmqServer {
const x25519_seckey& privkey,
const std::vector<x25519_pubkey>& stats_access_keys_hex);
// Initialize oxenmq
// Initialize oxenmq; return a future that completes once we have connected to and initialized
// from oxend.
void init(ServiceNode* sn, RequestHandler* rh, RateLimiter* rl, oxenmq::address oxend_rpc);
/// Dereferencing via * or -> accesses the contained OxenMQ instance.

View file

@ -12,6 +12,7 @@
#include "version.h"
#include <boost/endian/conversion.hpp>
#include <chrono>
#include <cpr/cpr.h>
#include <mutex>
#include <nlohmann/json.hpp>
@ -80,11 +81,22 @@ ServiceNode::ServiceNode(
}
void ServiceNode::on_oxend_connected() {
auto started = std::chrono::steady_clock::now();
update_swarms();
oxend_ping();
omq_server_->add_timer([this] { oxend_ping(); }, OXEND_PING_INTERVAL);
omq_server_->add_timer([this] { ping_peers(); },
reachability_testing::TESTING_TIMER_INTERVAL);
std::unique_lock lock{first_response_mutex_};
while (true) {
if (first_response_cv_.wait_for(lock, 5s, [this] { return got_first_response_; })) {
OXEN_LOG(info, "Got initial block update from oxend in {}", util::short_duration(
std::chrono::steady_clock::now() - started));
break;
}
OXEN_LOG(warn, "Still waiting for initial block update from oxend...");
}
}
template <typename T>
@ -543,7 +555,7 @@ void ServiceNode::update_swarms() {
return;
}
std::lock_guard guard(sn_mutex_);
std::lock_guard lock{sn_mutex_};
OXEN_LOG(debug, "Swarm update triggered");
@ -575,13 +587,16 @@ void ServiceNode::update_swarms() {
return;
}
try {
std::lock_guard guard(sn_mutex_);
std::lock_guard lock{sn_mutex_};
block_update bu = parse_swarm_update(data[1]);
if (!got_first_response_) {
OXEN_LOG(
info,
"Got initial swarm information from local Oxend");
got_first_response_ = true;
OXEN_LOG(info, "Got initial swarm information from local Oxend");
{
std::lock_guard l{first_response_mutex_};
got_first_response_ = true;
}
first_response_cv_.notify_all();
#ifndef INTEGRATION_TEST
// If this is our very first response then we *may* want to try falling back to

View file

@ -67,6 +67,8 @@ class ServiceNode {
bool syncing_ = true;
bool active_ = false;
bool got_first_response_ = false;
std::condition_variable first_response_cv_;
std::mutex first_response_mutex_;
bool force_start_ = false;
std::atomic<bool> shutting_down_ = false;
hf_revision hardfork_ = {0, 0};
@ -282,7 +284,8 @@ class ServiceNode {
}
// Called once we have established the initial connection to our local oxend to set up initial
// data and timers that rely on an oxend connection.
// data and timers that rely on an oxend connection. This blocks until we get an initial
// service node block update back from oxend.
void on_oxend_connected();
// Called when oxend notifies us of a new block to update swarm info

View file

@ -88,10 +88,14 @@ std::string short_duration(std::chrono::duration<double> dur) {
if (dur >= 1s)
return fmt::format("{:.1f}s", dur / 1s);
if (dur >= 1ms)
if (dur >= 100ms)
return fmt::format(u8"{:.0f}ms", dur / 1ms);
if (dur >= 1us)
if (dur >= 1ms)
return fmt::format(u8"{:.1f}ms", dur / 1ms);
if (dur >= 100us)
return fmt::format(u8"{:.0f}µs", dur / 1us);
if (dur >= 1us)
return fmt::format(u8"{:.1f}µs", dur / 1us);
if (dur >= 1ns)
return fmt::format(u8"{:.0f}ns", dur / 1ns);
return "0s";

2
vendors/oxen-mq vendored

@ -1 +1 @@
Subproject commit dccbd1e8cdb9f57206077facdf973c8e86fc6bec
Subproject commit ad04c53c0e90f904be100dac422a68676648c867