Compare commits

...

4 Commits

Author SHA1 Message Date
dr7ana d6e5aca572 libquic vbump 2023-12-13 10:36:09 -08:00
dr7ana fd21eb3a00 Big fix!
- pending conns container stops them from being counted towards active conns in the interim
- un-abstracted pendingmessages vs pendingdatamessages vs pendingcontrolmessages (gross)
- fixed bootstrap fetching and storage!
2023-12-13 09:42:09 -08:00
dr7ana d016951d2f Fixed pending message queue weirdness 2023-12-13 05:49:59 -08:00
dr7ana fbc71847ef libquic vbump 2023-12-13 05:09:23 -08:00
11 changed files with 284 additions and 230 deletions

@ -1 +1 @@
Subproject commit 3ace46701449c4c01f74aae2e0be3b4164768911
Subproject commit 0e431b912eb4bf76a9219861afc06cd8ceafa781

View File

@ -5,7 +5,7 @@ namespace llarp::link
Connection::Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::BTRequestStream>& s)
: conn{c}, control_stream{s}/* , remote_rc{std::move(rc)} */
: conn{c}, control_stream{s} /* , remote_rc{std::move(rc)} */
{}
} // namespace llarp::link

View File

@ -11,7 +11,6 @@ namespace llarp::link
{
std::shared_ptr<oxen::quic::connection_interface> conn;
std::shared_ptr<oxen::quic::BTRequestStream> control_stream;
// std::optional<RemoteRC> remote_rc;
// one side of a connection will be responsible for some things, e.g. heartbeat
bool inbound{false};

View File

@ -24,7 +24,7 @@ namespace llarp
std::shared_ptr<link::Connection>
Endpoint::get_conn(const RemoteRC& rc) const
{
if (auto itr = conns.find(rc.router_id()); itr != conns.end())
if (auto itr = active_conns.find(rc.router_id()); itr != active_conns.end())
return itr->second;
return nullptr;
@ -33,7 +33,7 @@ namespace llarp
std::shared_ptr<link::Connection>
Endpoint::get_conn(const RouterID& rid) const
{
if (auto itr = conns.find(rid); itr != conns.end())
if (auto itr = active_conns.find(rid); itr != active_conns.end())
return itr->second;
return nullptr;
@ -42,7 +42,7 @@ namespace llarp
bool
Endpoint::have_conn(const RouterID& remote, bool client_only) const
{
if (auto itr = conns.find(remote); itr != conns.end())
if (auto itr = active_conns.find(remote); itr != active_conns.end())
{
if (not(itr->second->remote_is_relay and client_only))
return true;
@ -56,7 +56,7 @@ namespace llarp
{
size_t count = 0;
for (const auto& c : conns)
for (const auto& c : active_conns)
{
if (not(c.second->remote_is_relay and clients_only))
count += 1;
@ -68,13 +68,13 @@ namespace llarp
bool
Endpoint::get_random_connection(RemoteRC& router) const
{
if (const auto size = conns.size(); size)
if (const auto size = active_conns.size(); size)
{
auto itr = conns.begin();
auto itr = active_conns.begin();
std::advance(itr, randint() % size);
RouterID rid{itr->second->conn->remote_key()};
if (auto maybe = link_manager.node_db->get_rc(rid))
{
router = *maybe;
@ -91,7 +91,7 @@ namespace llarp
void
Endpoint::for_each_connection(std::function<void(link::Connection&)> func)
{
for (const auto& [rid, conn] : conns)
for (const auto& [rid, conn] : active_conns)
func(*conn);
}
@ -99,14 +99,14 @@ namespace llarp
Endpoint::close_connection(RouterID _rid)
{
assert(link_manager._router.loop()->inEventLoop());
auto itr = conns.find(_rid);
if (itr != conns.end())
auto itr = active_conns.find(_rid);
if (itr != active_conns.end())
return;
auto& conn = *itr->second->conn;
conn.close_connection();
connid_map.erase(conn.scid());
conns.erase(itr);
active_conns.erase(itr);
}
} // namespace link
@ -125,8 +125,15 @@ namespace llarp
void
LinkManager::register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s)
{
assert(ep.connid_map.count(s->conn_id()));
const RouterID& router_id = ep.connid_map[s->conn_id()];
log::critical(logcat, "{} called", __PRETTY_FUNCTION__);
const RouterID& router_id {s->conn.remote_key()};
log::critical(logcat, "Registering commands (RID:{})", router_id);
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
});
s->register_command("path_build"s, [this, rid = router_id](oxen::quic::message m) {
_router.loop()->call(
@ -143,15 +150,11 @@ namespace llarp
[this, msg = std::move(m)]() mutable { handle_gossip_rc(std::move(msg)); });
});
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
_router.loop()->call(
[this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
});
for (auto& method : direct_requests)
{
s->register_command(
std::string{method.first}, [this, func = std::move(method.second)](oxen::quic::message m) {
std::string{method.first},
[this, func = std::move(method.second)](oxen::quic::message m) {
_router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable {
auto body = msg.body_str();
auto respond = [m = std::move(msg)](std::string response) mutable {
@ -161,6 +164,24 @@ namespace llarp
});
});
}
log::critical(logcat, "Registered all commands! (RID:{})", router_id);
}
LinkManager::LinkManager(Router& r)
: _router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})}
, ep{startup_endpoint(), *this}
{}
std::unique_ptr<LinkManager>
LinkManager::make(Router& r)
{
std::unique_ptr<LinkManager> p{new LinkManager(r)};
return p;
}
std::shared_ptr<oxen::quic::Endpoint>
@ -183,25 +204,26 @@ namespace llarp
},
[this](oxen::quic::dgram_interface& di, bstring dgram) { recv_data_message(di, dgram); });
tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view&) {
bool result = false;
bool result = true;
RouterID other{key.data()};
if (_router.is_bootstrap_seed())
{
if (node_db->whitelist().count(other))
// FIXME: remove "|| true", this is just for local testing!
if (node_db->whitelist().count(other) || true)
{
log::critical(logcat, "Saving bootstrap seed requester...");
auto [it, b] = node_db->seeds().emplace(other);
result &= b;
result |= b;
}
log::critical(
logcat,
"Bootstrap seed node was {} to confirm fetch requester is white-listed; saving RID",
result ? "able" : "unable");
"Bootstrap seed node was {} to confirm fetch requester is white-listed; {}successfully saved RID",
result ? "able" : "unable", result ? "" : "un");
return result;
}
if (node_db->has_rc(other))
result = true;
result = node_db->has_rc(other);
log::critical(
logcat, "{}uccessfully verified connection to {}!", result ? "S" : "Uns", other);
@ -214,8 +236,10 @@ namespace llarp
[&](oxen::quic::Connection& c,
oxen::quic::Endpoint& e,
std::optional<int64_t> id) -> std::shared_ptr<oxen::quic::Stream> {
if (id && id == 0)
if (id && *id == 0)
{
log::critical(logcat, "Stream constructor constructing BTStream (ID:{})", id);
auto s = e.make_shared<oxen::quic::BTRequestStream>(
c, e, [](oxen::quic::Stream& s, uint64_t error_code) {
log::warning(
@ -228,28 +252,133 @@ namespace llarp
return s;
}
log::critical(logcat, "Stream constructor constructing Stream (ID:{})!", id);
return e.make_shared<oxen::quic::Stream>(c, e);
});
}
return ep;
}
LinkManager::LinkManager(Router& r)
: _router{r}
, quic{std::make_unique<oxen::quic::Network>()}
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}},
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})}
, ep{startup_endpoint(), *this}
{}
std::unique_ptr<LinkManager>
LinkManager::make(Router& r)
void
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
{
std::unique_ptr<LinkManager> p{new LinkManager(r)};
return p;
const auto& scid = ci.scid();
RouterID rid{ci.remote_key()};
ep.connid_map.emplace(scid, rid);
auto [itr, b] = ep.active_conns.emplace(rid, nullptr);
log::critical(logcat, "Queueing BTStream to be opened...");
auto control_stream = ci.queue_stream<oxen::quic::BTRequestStream>([](oxen::quic::Stream& s,
uint64_t error_code) {
log::warning(
logcat, "BTRequestStream closed unexpectedly (ec:{}); closing connection...", error_code);
s.conn.close_connection(error_code);
});
log::critical(logcat, "Queued BTStream to be opened ID:{}", control_stream->stream_id());
assert(control_stream->stream_id() == 0);
// register_commands(control_stream);
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
_router.loop()->call([this, &conn_interface = ci]() {
const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
const auto& scid = conn_interface.scid();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
else
{
if (auto itr = ep.pending_conns.find(rid); itr != ep.pending_conns.end())
{
ep.connid_map.emplace(scid, rid);
auto [it, b] = ep.active_conns.emplace(rid, nullptr);
it->second = std::move(itr->second);
log::critical(logcat, "Connection to RID:{} moved from pending to active conns!", rid);
}
else
throw std::runtime_error{"Could not find newly established connection in pending conns!"};
}
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
rid);
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
auto& que = itr->second;
while (not que.empty())
{
auto& msg = que.front();
if (msg.is_control)
{
log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
ep.active_conns[rid]->control_stream->command(
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
conn_interface.send_datagram(std::move(msg.body));
}
que.pop_front();
}
return;
}
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
});
};
void
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
const auto& scid = conn_interface.scid();
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
{
const auto& rid = c_itr->second;
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
if (auto m_itr = ep.active_conns.find(rid); m_itr != ep.active_conns.end())
ep.active_conns.erase(m_itr);
ep.connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
}
});
}
bool
LinkManager::send_control_message(
const RouterID& remote,
@ -291,12 +420,24 @@ namespace llarp
endpoint = std::move(endpoint),
body = std::move(body),
f = std::move(func)]() {
auto pending = PendingControlMessage(std::move(body), std::move(endpoint), f);
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
if (auto it1 = ep.pending_conns.find(remote); it1 != ep.pending_conns.end())
{
if (auto it2 = pending_conn_msg_queue.find(remote); it2 != pending_conn_msg_queue.end())
{
it2->second.push_back(std::move(pending));
log::critical(logcat, "Connection (RID:{}) is pending; message appended to send queue!", remote);
}
}
else
{
log::critical(logcat, "Connection (RID:{}) not found in pending conns; creating send queue!", remote);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
}
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
});
return false;
@ -315,7 +456,7 @@ namespace llarp
}
_router.loop()->call([this, body = std::move(body), remote]() {
auto pending = PendingDataMessage(body);
auto pending = PendingMessage(std::move(body));
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
@ -385,107 +526,6 @@ namespace llarp
log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr);
}
void
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
{
const auto& scid = ci.scid();
RouterID rid{ci.remote_key()};
ep.connid_map.emplace(scid, rid);
auto [itr, b] = ep.conns.emplace(rid, nullptr);
auto control_stream = ci.queue_stream<oxen::quic::BTRequestStream>([](oxen::quic::Stream& s,
uint64_t error_code) {
log::warning(
logcat, "BTRequestStream closed unexpectedly (ec:{}); closing connection...", error_code);
s.conn.close_connection(error_code);
});
log::critical(logcat, "Opened BTStream ID:{}", control_stream->stream_id());
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
{
_router.loop()->call([this, &conn_interface = ci]() {
const auto rid = RouterID{conn_interface.remote_key()};
const auto& remote = conn_interface.remote();
if (conn_interface.is_inbound())
{
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
on_inbound_conn(conn_interface);
}
log::critical(
logcat,
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
_router.local_rid(),
rid);
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
auto& que = itr->second;
while (not que.empty())
{
auto& m = que.front();
if (m.is_control)
{
auto& msg = reinterpret_cast<PendingControlMessage&>(m);
log::critical(logcat, "Dispatching {} request!", msg.endpoint);
ep.conns[rid]->control_stream->command(msg.endpoint, msg.body, msg.func);
}
else
{
auto& msg = reinterpret_cast<PendingDataMessage&>(m);
conn_interface.send_datagram(std::move(msg.body));
}
que.pop_front();
}
return;
}
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
});
};
void
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
{
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
const auto& scid = conn_interface.scid();
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
{
const auto& rid = c_itr->second;
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);
if (auto m_itr = ep.conns.find(rid); m_itr != ep.conns.end())
ep.conns.erase(m_itr);
ep.connid_map.erase(c_itr);
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
}
});
}
bool
LinkManager::have_connection_to(const RouterID& remote, bool client_only) const
{
@ -598,7 +638,7 @@ namespace llarp
void
LinkManager::gossip_rc(const RouterID& rc_rid, std::string serialized_rc)
{
for (auto& [rid, conn] : ep.conns)
for (auto& [rid, conn] : ep.active_conns)
{
// don't send back to the owner...
if (rid == rc_rid)
@ -607,7 +647,9 @@ namespace llarp
if (not conn->remote_is_relay)
continue;
send_control_message(rid, "gossip_rc", serialized_rc);
send_control_message(rid, "gossip_rc", serialized_rc, [](oxen::quic::message) mutable {
log::critical(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
});
}
}
@ -647,7 +689,7 @@ namespace llarp
}
log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id());
auto pending = PendingControlMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue());
itr->second.push_back(std::move(pending));
@ -688,22 +730,26 @@ namespace llarp
// TODO: if we are not the seed, how do we check the requester
if (is_seed)
{
// we already insert the
// we already insert the
auto& seeds = node_db->seeds();
if (auto itr = seeds.find(rid); itr != seeds.end())
{
log::critical(logcat, "Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and saving RC!", rid);
log::critical(
logcat,
"Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and "
"saving RC!",
rid);
node_db->put_rc(remote);
}
}
auto& src = is_seed ? node_db->bootstrap_seeds() : node_db->get_known_rcs();
auto& src = node_db->get_known_rcs();
auto count = src.size();
if (count == 0)
{
log::error(logcat, "No {} locally to send!", is_seed ? "bootstrap seeds" : "known RCs");
log::error(logcat, "No known RCs locally to send!");
m.respond(messages::ERROR_RESPONSE, true);
return;
}
@ -895,9 +941,9 @@ namespace llarp
void
LinkManager::handle_find_name_response(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "FindNameMessage timed out!");
log::info(link_cat, "FindNameMessage failed!");
return;
}
@ -1031,7 +1077,7 @@ namespace llarp
"publish_intro",
PublishIntroMessage::serialize(introset, relay_order, is_relayed),
[respond = std::move(respond)](oxen::quic::message m) {
if (m.timed_out)
if (not m)
return; // drop if timed out; requester will have timed out as well
respond(m.body_str());
});
@ -1069,7 +1115,7 @@ namespace llarp
void
LinkManager::handle_publish_intro_response(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "PublishIntroMessage timed out!");
return;
@ -1172,7 +1218,7 @@ namespace llarp
link_cat,
"Relayed FindIntroMessage returned successful response; transmitting to initial "
"requester");
else if (relay_response.timed_out)
else if (not relay_response)
log::critical(
link_cat, "Relayed FindIntroMessage timed out! Notifying initial requester");
else
@ -1199,7 +1245,7 @@ namespace llarp
void
LinkManager::handle_find_intro_response(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "FindIntroMessage timed out!");
return;
@ -1392,7 +1438,7 @@ namespace llarp
"then relaying response");
_router.path_context().put_transit_hop(hop);
}
if (m.timed_out)
if (not m)
log::info(link_cat, "Upstream timed out on path build; relaying timeout");
else
log::info(link_cat, "Upstream returned path build failure; relaying response");
@ -1509,7 +1555,7 @@ namespace llarp
void
LinkManager::handle_obtain_exit_response(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "ObtainExitMessage timed out!");
return;
@ -1587,7 +1633,7 @@ namespace llarp
void
LinkManager::handle_update_exit_response(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "UpdateExitMessage timed out!");
return;
@ -1672,7 +1718,7 @@ namespace llarp
void
LinkManager::handle_close_exit_response(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "CloseExitMessage timed out!");
return;
@ -1824,7 +1870,7 @@ namespace llarp
void
LinkManager::handle_convo_intro(oxen::quic::message m)
{
if (m.timed_out)
if (not m)
{
log::info(link_cat, "Path control message timed out!");
return;

View File

@ -48,9 +48,12 @@ namespace llarp
// for outgoing packets, we route via RouterID; map RouterID->Connection
// for incoming packets, we get a ConnectionID; map ConnectionID->RouterID
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> conns;
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> active_conns;
std::unordered_map<oxen::quic::ConnectionID, RouterID> connid_map;
// for pending connections, cleared in LinkManager::on_conn_open
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> pending_conns;
// TODO: see which of these is actually useful and delete the other
std::shared_ptr<link::Connection>
get_conn(const RemoteRC&) const;
@ -109,27 +112,18 @@ namespace llarp
struct PendingMessage
{
std::string body;
std::optional<std::string> endpoint = std::nullopt;
std::function<void(oxen::quic::message)> func = nullptr;
RouterID rid;
bool is_control{false};
bool is_control = false;
PendingMessage(std::string b, bool control = false) : body{std::move(b)}, is_control{control}
PendingMessage(std::string b) : body{std::move(b)}
{}
};
struct PendingDataMessage : PendingMessage
{
PendingDataMessage(std::string b) : PendingMessage(b)
{}
};
struct PendingControlMessage : PendingMessage
{
std::string endpoint;
std::function<void(oxen::quic::message)> func;
PendingControlMessage(
std::string b, std::string e, std::function<void(oxen::quic::message)> f = nullptr)
: PendingMessage(b, true), endpoint{std::move(e)}, func{std::move(f)}
PendingMessage(
std::string b, std::string ep, std::function<void(oxen::quic::message)> f = nullptr)
: body{std::move(b)}, endpoint{std::move(ep)}, func{std::move(f)}, is_control{true}
{}
};
@ -404,16 +398,14 @@ namespace llarp
{
try
{
log::critical(logcat, "Establishing connection to {}", remote);
const auto& rid = rc.router_id();
log::critical(logcat, "Establishing connection to RID:{}", rid);
auto conn_interface =
endpoint->connect(remote, link_manager.tls_creds, std::forward<Opt>(opts)...);
// emplace immediately for connection open callback to find scid
connid_map.emplace(conn_interface->scid(), rc.router_id());
auto [itr, b] = conns.emplace(rc.router_id(), nullptr);
log::critical(logcat, "Establishing connection to {}...", rc.router_id());
// add to pending conns
auto [itr, b] = pending_conns.emplace(rid, nullptr);
auto control_stream = conn_interface->template get_new_stream<oxen::quic::BTRequestStream>(
[](oxen::quic::Stream& s, uint64_t error_code) {
@ -427,6 +419,7 @@ namespace llarp
link_manager.register_commands(control_stream);
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream);
log::critical(logcat, "Connection to RID:{} added to pending connections...", rid);
return true;
}
catch (...)

View File

@ -379,16 +379,16 @@ namespace llarp
src,
FetchRCMessage::serialize(_router.last_rc_fetch, needed),
[this, src, initial](oxen::quic::message m) mutable {
if (m.timed_out)
if (not m)
{
log::info(logcat, "RC fetch to {} timed out", src);
log::info(logcat, "RC fetch to {} failed!", src);
fetch_rcs_result(initial, true);
return;
}
try
{
oxenc::bt_dict_consumer btdc{m.body()};
// TODO: fix this shit after removing ::timed_out from message type
if (not m)
{
auto reason = btdc.require<std::string_view>(messages::STATUS_KEY);
@ -668,13 +668,14 @@ namespace llarp
_needs_rebootstrap = false;
++bootstrap_attempts;
log::critical(
logcat, "Dispatching BootstrapRC fetch request to {}", _bootstraps.current().view());
log::critical(logcat, "Dispatching BootstrapRC fetch request to {}", fetch_source);
_router.link_manager().fetch_bootstrap_rcs(
rc,
BootstrapFetchMessage::serialize(_router.router_contact, BOOTSTRAP_SOURCE_COUNT),
[this](oxen::quic::message m) mutable {
log::critical(logcat, "Received response to BootstrapRC fetch request...");
if (not m)
{
// ++bootstrap_attempts;
@ -688,7 +689,8 @@ namespace llarp
return;
}
std::set<RouterID> rids;
// std::set<RouterID> rids;
size_t num = 0;
try
{
@ -700,7 +702,8 @@ namespace llarp
while (not btlc.is_finished())
{
auto rc = RemoteRC{btlc.consume_dict_data()};
rids.emplace(rc.router_id());
put_rc(rc);
++num;
}
}
}
@ -723,23 +726,30 @@ namespace llarp
// next call to fallback_to_bootstrap() and hit the base case, rotating sources
// bootstrap_attempts = MAX_BOOTSTRAP_FETCH_ATTEMPTS;
if (rids.size() == BOOTSTRAP_SOURCE_COUNT)
{
known_rids.merge(rids);
fetch_initial();
}
else
{
// ++bootstrap_attempts;
log::warning(
logcat,
"BootstrapRC fetch response from {} returned insufficient number of RC's (error "
"{}/{})",
fetch_source,
bootstrap_attempts,
MAX_BOOTSTRAP_FETCH_ATTEMPTS);
fallback_to_bootstrap();
}
// const auto& num = rids.size();
log::critical(logcat, "BootstrapRC fetch response from {} returned {}/{} needed RCs", fetch_source, num, BOOTSTRAP_SOURCE_COUNT);
// known_rids.merge(rids);
fetch_initial();
// FIXME: when moving to testnet, uncomment this
// if (rids.size() == BOOTSTRAP_SOURCE_COUNT)
// {
// known_rids.merge(rids);
// fetch_initial();
// }
// else
// {
// // ++bootstrap_attempts;
// log::warning(
// logcat,
// "BootstrapRC fetch response from {} returned insufficient number of RC's (error "
// "{}/{})",
// fetch_source,
// bootstrap_attempts,
// MAX_BOOTSTRAP_FETCH_ATTEMPTS);
// fallback_to_bootstrap();
// }
});
}
@ -954,9 +964,6 @@ namespace llarp
{
const auto& rid = rc.router_id();
if (not want_rc(rid))
return false;
known_rcs.erase(rc);
rc_lookup.erase(rid);
@ -974,6 +981,12 @@ namespace llarp
return known_rcs.size();
}
size_t
NodeDB::num_rids() const
{
return known_rids.size();
}
bool
NodeDB::put_rc_if_newer(RemoteRC rc, rc_time now)
{

View File

@ -403,6 +403,9 @@ namespace llarp
size_t
num_rcs() const;
size_t
num_rids() const;
/// do periodic tasks like flush to disk and expiration
void
Tick(llarp_time_t now);

View File

@ -118,7 +118,7 @@ namespace llarp::path
if ((not self) or (not response_cb))
return;
if (m.timed_out)
if (not m)
{
response_cb(messages::TIMEOUT_RESPONSE);
return;

View File

@ -513,9 +513,9 @@ namespace llarp
path->EnterState(path::ePathEstablished);
return;
}
if (m.timed_out)
if (not m)
{
log::warning(path_cat, "Path build timed out");
log::warning(path_cat, "Path build request failed!");
}
else
{

View File

@ -218,9 +218,8 @@ namespace llarp
std::unordered_set<RouterID> peer_pubkeys;
for_each_connection([&peer_pubkeys](link::Connection& conn) {
peer_pubkeys.emplace(conn.conn->remote_key());
});
for_each_connection(
[&peer_pubkeys](link::Connection& conn) { peer_pubkeys.emplace(conn.conn->remote_key()); });
loop()->call([this, &peer_pubkeys]() {
for (auto& pk : peer_pubkeys)
@ -769,8 +768,9 @@ namespace llarp
log::critical(
logcat,
"{} RCs loaded with {} bootstrap peers and {} router connections!",
"{} RCs loaded with {} RIDs, {} bootstrap peers, and {} router connections!",
_node_db->num_rcs(),
_node_db->num_rids(),
_node_db->num_bootstraps(),
num_router_connections());
@ -1139,7 +1139,7 @@ namespace llarp
log::info(logcat, "Loading NodeDB from disk...");
_node_db->load_from_disk();
_node_db->store_bootstraps();
// _node_db->store_bootstraps();
oxen::log::flush();

View File

@ -97,7 +97,7 @@ namespace llarp::rpc
return; // bail
}
LogDebug("new block at height ", m_BlockHeight);
log::trace(logcat, "new block at height {}", m_BlockHeight);
// don't upadate on block notification if an update is pending
if (not m_UpdatingList)
UpdateServiceNodeList();
@ -138,7 +138,7 @@ namespace llarp::rpc
throw std::runtime_error{"get_service_nodes did not return 'OK' status"};
if (auto it = json.find("unchanged");
it != json.end() and it->is_boolean() and it->get<bool>())
LogDebug("service node list unchanged");
log::trace(logcat, "service node list unchanged");
else
{
self->HandleNewServiceNodeList(json.at("service_node_states"));