mirror of https://github.com/oxen-io/lokinet
Compare commits
4 Commits
ea614ed141
...
d6e5aca572
Author | SHA1 | Date |
---|---|---|
dr7ana | d6e5aca572 | |
dr7ana | fd21eb3a00 | |
dr7ana | d016951d2f | |
dr7ana | fbc71847ef |
|
@ -1 +1 @@
|
|||
Subproject commit 3ace46701449c4c01f74aae2e0be3b4164768911
|
||||
Subproject commit 0e431b912eb4bf76a9219861afc06cd8ceafa781
|
|
@ -5,7 +5,7 @@ namespace llarp::link
|
|||
Connection::Connection(
|
||||
const std::shared_ptr<oxen::quic::connection_interface>& c,
|
||||
std::shared_ptr<oxen::quic::BTRequestStream>& s)
|
||||
: conn{c}, control_stream{s}/* , remote_rc{std::move(rc)} */
|
||||
: conn{c}, control_stream{s} /* , remote_rc{std::move(rc)} */
|
||||
{}
|
||||
|
||||
} // namespace llarp::link
|
||||
|
|
|
@ -11,7 +11,6 @@ namespace llarp::link
|
|||
{
|
||||
std::shared_ptr<oxen::quic::connection_interface> conn;
|
||||
std::shared_ptr<oxen::quic::BTRequestStream> control_stream;
|
||||
// std::optional<RemoteRC> remote_rc;
|
||||
|
||||
// one side of a connection will be responsible for some things, e.g. heartbeat
|
||||
bool inbound{false};
|
||||
|
|
|
@ -24,7 +24,7 @@ namespace llarp
|
|||
std::shared_ptr<link::Connection>
|
||||
Endpoint::get_conn(const RemoteRC& rc) const
|
||||
{
|
||||
if (auto itr = conns.find(rc.router_id()); itr != conns.end())
|
||||
if (auto itr = active_conns.find(rc.router_id()); itr != active_conns.end())
|
||||
return itr->second;
|
||||
|
||||
return nullptr;
|
||||
|
@ -33,7 +33,7 @@ namespace llarp
|
|||
std::shared_ptr<link::Connection>
|
||||
Endpoint::get_conn(const RouterID& rid) const
|
||||
{
|
||||
if (auto itr = conns.find(rid); itr != conns.end())
|
||||
if (auto itr = active_conns.find(rid); itr != active_conns.end())
|
||||
return itr->second;
|
||||
|
||||
return nullptr;
|
||||
|
@ -42,7 +42,7 @@ namespace llarp
|
|||
bool
|
||||
Endpoint::have_conn(const RouterID& remote, bool client_only) const
|
||||
{
|
||||
if (auto itr = conns.find(remote); itr != conns.end())
|
||||
if (auto itr = active_conns.find(remote); itr != active_conns.end())
|
||||
{
|
||||
if (not(itr->second->remote_is_relay and client_only))
|
||||
return true;
|
||||
|
@ -56,7 +56,7 @@ namespace llarp
|
|||
{
|
||||
size_t count = 0;
|
||||
|
||||
for (const auto& c : conns)
|
||||
for (const auto& c : active_conns)
|
||||
{
|
||||
if (not(c.second->remote_is_relay and clients_only))
|
||||
count += 1;
|
||||
|
@ -68,13 +68,13 @@ namespace llarp
|
|||
bool
|
||||
Endpoint::get_random_connection(RemoteRC& router) const
|
||||
{
|
||||
if (const auto size = conns.size(); size)
|
||||
if (const auto size = active_conns.size(); size)
|
||||
{
|
||||
auto itr = conns.begin();
|
||||
auto itr = active_conns.begin();
|
||||
std::advance(itr, randint() % size);
|
||||
|
||||
RouterID rid{itr->second->conn->remote_key()};
|
||||
|
||||
|
||||
if (auto maybe = link_manager.node_db->get_rc(rid))
|
||||
{
|
||||
router = *maybe;
|
||||
|
@ -91,7 +91,7 @@ namespace llarp
|
|||
void
|
||||
Endpoint::for_each_connection(std::function<void(link::Connection&)> func)
|
||||
{
|
||||
for (const auto& [rid, conn] : conns)
|
||||
for (const auto& [rid, conn] : active_conns)
|
||||
func(*conn);
|
||||
}
|
||||
|
||||
|
@ -99,14 +99,14 @@ namespace llarp
|
|||
Endpoint::close_connection(RouterID _rid)
|
||||
{
|
||||
assert(link_manager._router.loop()->inEventLoop());
|
||||
auto itr = conns.find(_rid);
|
||||
if (itr != conns.end())
|
||||
auto itr = active_conns.find(_rid);
|
||||
if (itr != active_conns.end())
|
||||
return;
|
||||
|
||||
auto& conn = *itr->second->conn;
|
||||
conn.close_connection();
|
||||
connid_map.erase(conn.scid());
|
||||
conns.erase(itr);
|
||||
active_conns.erase(itr);
|
||||
}
|
||||
|
||||
} // namespace link
|
||||
|
@ -125,8 +125,15 @@ namespace llarp
|
|||
void
|
||||
LinkManager::register_commands(std::shared_ptr<oxen::quic::BTRequestStream>& s)
|
||||
{
|
||||
assert(ep.connid_map.count(s->conn_id()));
|
||||
const RouterID& router_id = ep.connid_map[s->conn_id()];
|
||||
log::critical(logcat, "{} called", __PRETTY_FUNCTION__);
|
||||
const RouterID& router_id {s->conn.remote_key()};
|
||||
|
||||
log::critical(logcat, "Registering commands (RID:{})", router_id);
|
||||
|
||||
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
|
||||
_router.loop()->call(
|
||||
[this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
|
||||
});
|
||||
|
||||
s->register_command("path_build"s, [this, rid = router_id](oxen::quic::message m) {
|
||||
_router.loop()->call(
|
||||
|
@ -143,15 +150,11 @@ namespace llarp
|
|||
[this, msg = std::move(m)]() mutable { handle_gossip_rc(std::move(msg)); });
|
||||
});
|
||||
|
||||
s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
|
||||
_router.loop()->call(
|
||||
[this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
|
||||
});
|
||||
|
||||
for (auto& method : direct_requests)
|
||||
{
|
||||
s->register_command(
|
||||
std::string{method.first}, [this, func = std::move(method.second)](oxen::quic::message m) {
|
||||
std::string{method.first},
|
||||
[this, func = std::move(method.second)](oxen::quic::message m) {
|
||||
_router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable {
|
||||
auto body = msg.body_str();
|
||||
auto respond = [m = std::move(msg)](std::string response) mutable {
|
||||
|
@ -161,6 +164,24 @@ namespace llarp
|
|||
});
|
||||
});
|
||||
}
|
||||
|
||||
log::critical(logcat, "Registered all commands! (RID:{})", router_id);
|
||||
}
|
||||
|
||||
LinkManager::LinkManager(Router& r)
|
||||
: _router{r}
|
||||
, quic{std::make_unique<oxen::quic::Network>()}
|
||||
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
|
||||
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}},
|
||||
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})}
|
||||
, ep{startup_endpoint(), *this}
|
||||
{}
|
||||
|
||||
std::unique_ptr<LinkManager>
|
||||
LinkManager::make(Router& r)
|
||||
{
|
||||
std::unique_ptr<LinkManager> p{new LinkManager(r)};
|
||||
return p;
|
||||
}
|
||||
|
||||
std::shared_ptr<oxen::quic::Endpoint>
|
||||
|
@ -183,25 +204,26 @@ namespace llarp
|
|||
},
|
||||
[this](oxen::quic::dgram_interface& di, bstring dgram) { recv_data_message(di, dgram); });
|
||||
tls_creds->set_key_verify_callback([this](const ustring_view& key, const ustring_view&) {
|
||||
bool result = false;
|
||||
bool result = true;
|
||||
RouterID other{key.data()};
|
||||
|
||||
if (_router.is_bootstrap_seed())
|
||||
{
|
||||
if (node_db->whitelist().count(other))
|
||||
// FIXME: remove "|| true", this is just for local testing!
|
||||
if (node_db->whitelist().count(other) || true)
|
||||
{
|
||||
log::critical(logcat, "Saving bootstrap seed requester...");
|
||||
auto [it, b] = node_db->seeds().emplace(other);
|
||||
result &= b;
|
||||
result |= b;
|
||||
}
|
||||
log::critical(
|
||||
logcat,
|
||||
"Bootstrap seed node was {} to confirm fetch requester is white-listed; saving RID",
|
||||
result ? "able" : "unable");
|
||||
"Bootstrap seed node was {} to confirm fetch requester is white-listed; {}successfully saved RID",
|
||||
result ? "able" : "unable", result ? "" : "un");
|
||||
return result;
|
||||
}
|
||||
|
||||
if (node_db->has_rc(other))
|
||||
result = true;
|
||||
result = node_db->has_rc(other);
|
||||
|
||||
log::critical(
|
||||
logcat, "{}uccessfully verified connection to {}!", result ? "S" : "Uns", other);
|
||||
|
@ -214,8 +236,10 @@ namespace llarp
|
|||
[&](oxen::quic::Connection& c,
|
||||
oxen::quic::Endpoint& e,
|
||||
std::optional<int64_t> id) -> std::shared_ptr<oxen::quic::Stream> {
|
||||
if (id && id == 0)
|
||||
|
||||
if (id && *id == 0)
|
||||
{
|
||||
log::critical(logcat, "Stream constructor constructing BTStream (ID:{})", id);
|
||||
auto s = e.make_shared<oxen::quic::BTRequestStream>(
|
||||
c, e, [](oxen::quic::Stream& s, uint64_t error_code) {
|
||||
log::warning(
|
||||
|
@ -228,28 +252,133 @@ namespace llarp
|
|||
return s;
|
||||
}
|
||||
|
||||
log::critical(logcat, "Stream constructor constructing Stream (ID:{})!", id);
|
||||
|
||||
return e.make_shared<oxen::quic::Stream>(c, e);
|
||||
});
|
||||
}
|
||||
return ep;
|
||||
}
|
||||
|
||||
LinkManager::LinkManager(Router& r)
|
||||
: _router{r}
|
||||
, quic{std::make_unique<oxen::quic::Network>()}
|
||||
, tls_creds{oxen::quic::GNUTLSCreds::make_from_ed_keys(
|
||||
{reinterpret_cast<const char*>(_router.identity().data()), size_t{32}},
|
||||
{reinterpret_cast<const char*>(_router.identity().toPublic().data()), size_t{32}})}
|
||||
, ep{startup_endpoint(), *this}
|
||||
{}
|
||||
|
||||
std::unique_ptr<LinkManager>
|
||||
LinkManager::make(Router& r)
|
||||
void
|
||||
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
|
||||
{
|
||||
std::unique_ptr<LinkManager> p{new LinkManager(r)};
|
||||
return p;
|
||||
const auto& scid = ci.scid();
|
||||
RouterID rid{ci.remote_key()};
|
||||
ep.connid_map.emplace(scid, rid);
|
||||
auto [itr, b] = ep.active_conns.emplace(rid, nullptr);
|
||||
|
||||
log::critical(logcat, "Queueing BTStream to be opened...");
|
||||
|
||||
auto control_stream = ci.queue_stream<oxen::quic::BTRequestStream>([](oxen::quic::Stream& s,
|
||||
uint64_t error_code) {
|
||||
log::warning(
|
||||
logcat, "BTRequestStream closed unexpectedly (ec:{}); closing connection...", error_code);
|
||||
s.conn.close_connection(error_code);
|
||||
});
|
||||
|
||||
log::critical(logcat, "Queued BTStream to be opened ID:{}", control_stream->stream_id());
|
||||
assert(control_stream->stream_id() == 0);
|
||||
// register_commands(control_stream);
|
||||
|
||||
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
|
||||
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
|
||||
}
|
||||
|
||||
// TODO: should we add routes here now that Router::SessionOpen is gone?
|
||||
void
|
||||
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
|
||||
{
|
||||
_router.loop()->call([this, &conn_interface = ci]() {
|
||||
const auto rid = RouterID{conn_interface.remote_key()};
|
||||
const auto& remote = conn_interface.remote();
|
||||
const auto& scid = conn_interface.scid();
|
||||
|
||||
if (conn_interface.is_inbound())
|
||||
{
|
||||
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
|
||||
on_inbound_conn(conn_interface);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (auto itr = ep.pending_conns.find(rid); itr != ep.pending_conns.end())
|
||||
{
|
||||
ep.connid_map.emplace(scid, rid);
|
||||
auto [it, b] = ep.active_conns.emplace(rid, nullptr);
|
||||
it->second = std::move(itr->second);
|
||||
log::critical(logcat, "Connection to RID:{} moved from pending to active conns!", rid);
|
||||
}
|
||||
else
|
||||
throw std::runtime_error{"Could not find newly established connection in pending conns!"};
|
||||
}
|
||||
|
||||
log::critical(
|
||||
logcat,
|
||||
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
|
||||
_router.local_rid(),
|
||||
rid);
|
||||
|
||||
// check to see if this connection was established while we were attempting to queue
|
||||
// messages to the remote
|
||||
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
|
||||
{
|
||||
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
|
||||
auto& que = itr->second;
|
||||
|
||||
while (not que.empty())
|
||||
{
|
||||
auto& msg = que.front();
|
||||
|
||||
if (msg.is_control)
|
||||
{
|
||||
log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
|
||||
ep.active_conns[rid]->control_stream->command(
|
||||
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
|
||||
}
|
||||
else
|
||||
{
|
||||
conn_interface.send_datagram(std::move(msg.body));
|
||||
}
|
||||
|
||||
que.pop_front();
|
||||
}
|
||||
return;
|
||||
}
|
||||
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
|
||||
});
|
||||
};
|
||||
|
||||
void
|
||||
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
|
||||
{
|
||||
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
|
||||
const auto& scid = conn_interface.scid();
|
||||
|
||||
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
|
||||
|
||||
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
|
||||
{
|
||||
const auto& rid = c_itr->second;
|
||||
|
||||
// if (auto maybe = rids_pending_verification.find(rid);
|
||||
// maybe != rids_pending_verification.end())
|
||||
// rids_pending_verification.erase(maybe);
|
||||
|
||||
// in case this didn't clear earlier, do it now
|
||||
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
|
||||
pending_conn_msg_queue.erase(p_itr);
|
||||
|
||||
if (auto m_itr = ep.active_conns.find(rid); m_itr != ep.active_conns.end())
|
||||
ep.active_conns.erase(m_itr);
|
||||
|
||||
ep.connid_map.erase(c_itr);
|
||||
|
||||
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
LinkManager::send_control_message(
|
||||
const RouterID& remote,
|
||||
|
@ -291,12 +420,24 @@ namespace llarp
|
|||
endpoint = std::move(endpoint),
|
||||
body = std::move(body),
|
||||
f = std::move(func)]() {
|
||||
auto pending = PendingControlMessage(std::move(body), std::move(endpoint), f);
|
||||
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
|
||||
|
||||
if (auto it1 = ep.pending_conns.find(remote); it1 != ep.pending_conns.end())
|
||||
{
|
||||
if (auto it2 = pending_conn_msg_queue.find(remote); it2 != pending_conn_msg_queue.end())
|
||||
{
|
||||
it2->second.push_back(std::move(pending));
|
||||
log::critical(logcat, "Connection (RID:{}) is pending; message appended to send queue!", remote);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
log::critical(logcat, "Connection (RID:{}) not found in pending conns; creating send queue!", remote);
|
||||
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
|
||||
itr->second.push_back(std::move(pending));
|
||||
connect_to(remote);
|
||||
}
|
||||
|
||||
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
|
||||
itr->second.push_back(std::move(pending));
|
||||
|
||||
connect_to(remote);
|
||||
});
|
||||
|
||||
return false;
|
||||
|
@ -315,7 +456,7 @@ namespace llarp
|
|||
}
|
||||
|
||||
_router.loop()->call([this, body = std::move(body), remote]() {
|
||||
auto pending = PendingDataMessage(body);
|
||||
auto pending = PendingMessage(std::move(body));
|
||||
|
||||
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
|
||||
itr->second.push_back(std::move(pending));
|
||||
|
@ -385,107 +526,6 @@ namespace llarp
|
|||
log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr);
|
||||
}
|
||||
|
||||
void
|
||||
LinkManager::on_inbound_conn(oxen::quic::connection_interface& ci)
|
||||
{
|
||||
const auto& scid = ci.scid();
|
||||
RouterID rid{ci.remote_key()};
|
||||
ep.connid_map.emplace(scid, rid);
|
||||
auto [itr, b] = ep.conns.emplace(rid, nullptr);
|
||||
|
||||
auto control_stream = ci.queue_stream<oxen::quic::BTRequestStream>([](oxen::quic::Stream& s,
|
||||
uint64_t error_code) {
|
||||
log::warning(
|
||||
logcat, "BTRequestStream closed unexpectedly (ec:{}); closing connection...", error_code);
|
||||
s.conn.close_connection(error_code);
|
||||
});
|
||||
log::critical(logcat, "Opened BTStream ID:{}", control_stream->stream_id());
|
||||
|
||||
itr->second = std::make_shared<link::Connection>(ci.shared_from_this(), control_stream);
|
||||
log::critical(logcat, "Successfully configured inbound connection fom {}...", rid);
|
||||
}
|
||||
|
||||
// TODO: should we add routes here now that Router::SessionOpen is gone?
|
||||
void
|
||||
LinkManager::on_conn_open(oxen::quic::connection_interface& ci)
|
||||
{
|
||||
_router.loop()->call([this, &conn_interface = ci]() {
|
||||
const auto rid = RouterID{conn_interface.remote_key()};
|
||||
const auto& remote = conn_interface.remote();
|
||||
|
||||
if (conn_interface.is_inbound())
|
||||
{
|
||||
log::critical(logcat, "Inbound connection fom {} (remote:{})", rid, remote);
|
||||
on_inbound_conn(conn_interface);
|
||||
}
|
||||
|
||||
log::critical(
|
||||
logcat,
|
||||
"SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{}",
|
||||
_router.local_rid(),
|
||||
rid);
|
||||
|
||||
// check to see if this connection was established while we were attempting to queue
|
||||
// messages to the remote
|
||||
if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
|
||||
{
|
||||
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
|
||||
auto& que = itr->second;
|
||||
|
||||
while (not que.empty())
|
||||
{
|
||||
auto& m = que.front();
|
||||
|
||||
if (m.is_control)
|
||||
{
|
||||
auto& msg = reinterpret_cast<PendingControlMessage&>(m);
|
||||
log::critical(logcat, "Dispatching {} request!", msg.endpoint);
|
||||
ep.conns[rid]->control_stream->command(msg.endpoint, msg.body, msg.func);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto& msg = reinterpret_cast<PendingDataMessage&>(m);
|
||||
conn_interface.send_datagram(std::move(msg.body));
|
||||
}
|
||||
|
||||
que.pop_front();
|
||||
}
|
||||
return;
|
||||
}
|
||||
log::warning(logcat, "No pending queue to clear for RID:{}", rid);
|
||||
});
|
||||
};
|
||||
|
||||
void
|
||||
LinkManager::on_conn_closed(oxen::quic::connection_interface& ci, uint64_t ec)
|
||||
{
|
||||
_router.loop()->call([this, &conn_interface = ci, error_code = ec]() {
|
||||
const auto& scid = conn_interface.scid();
|
||||
|
||||
log::critical(quic_cat, "Purging quic connection CID:{} (ec: {})", scid, error_code);
|
||||
|
||||
if (const auto& c_itr = ep.connid_map.find(scid); c_itr != ep.connid_map.end())
|
||||
{
|
||||
const auto& rid = c_itr->second;
|
||||
|
||||
// if (auto maybe = rids_pending_verification.find(rid);
|
||||
// maybe != rids_pending_verification.end())
|
||||
// rids_pending_verification.erase(maybe);
|
||||
|
||||
// in case this didn't clear earlier, do it now
|
||||
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
|
||||
pending_conn_msg_queue.erase(p_itr);
|
||||
|
||||
if (auto m_itr = ep.conns.find(rid); m_itr != ep.conns.end())
|
||||
ep.conns.erase(m_itr);
|
||||
|
||||
ep.connid_map.erase(c_itr);
|
||||
|
||||
log::critical(quic_cat, "Quic connection CID:{} purged successfully", scid);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
LinkManager::have_connection_to(const RouterID& remote, bool client_only) const
|
||||
{
|
||||
|
@ -598,7 +638,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::gossip_rc(const RouterID& rc_rid, std::string serialized_rc)
|
||||
{
|
||||
for (auto& [rid, conn] : ep.conns)
|
||||
for (auto& [rid, conn] : ep.active_conns)
|
||||
{
|
||||
// don't send back to the owner...
|
||||
if (rid == rc_rid)
|
||||
|
@ -607,7 +647,9 @@ namespace llarp
|
|||
if (not conn->remote_is_relay)
|
||||
continue;
|
||||
|
||||
send_control_message(rid, "gossip_rc", serialized_rc);
|
||||
send_control_message(rid, "gossip_rc", serialized_rc, [](oxen::quic::message) mutable {
|
||||
log::critical(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -647,7 +689,7 @@ namespace llarp
|
|||
}
|
||||
|
||||
log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id());
|
||||
auto pending = PendingControlMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
|
||||
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
|
||||
|
||||
auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue());
|
||||
itr->second.push_back(std::move(pending));
|
||||
|
@ -688,22 +730,26 @@ namespace llarp
|
|||
// TODO: if we are not the seed, how do we check the requester
|
||||
if (is_seed)
|
||||
{
|
||||
// we already insert the
|
||||
// we already insert the
|
||||
auto& seeds = node_db->seeds();
|
||||
|
||||
|
||||
if (auto itr = seeds.find(rid); itr != seeds.end())
|
||||
{
|
||||
log::critical(logcat, "Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and saving RC!", rid);
|
||||
log::critical(
|
||||
logcat,
|
||||
"Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and "
|
||||
"saving RC!",
|
||||
rid);
|
||||
node_db->put_rc(remote);
|
||||
}
|
||||
}
|
||||
|
||||
auto& src = is_seed ? node_db->bootstrap_seeds() : node_db->get_known_rcs();
|
||||
auto& src = node_db->get_known_rcs();
|
||||
auto count = src.size();
|
||||
|
||||
if (count == 0)
|
||||
{
|
||||
log::error(logcat, "No {} locally to send!", is_seed ? "bootstrap seeds" : "known RCs");
|
||||
log::error(logcat, "No known RCs locally to send!");
|
||||
m.respond(messages::ERROR_RESPONSE, true);
|
||||
return;
|
||||
}
|
||||
|
@ -895,9 +941,9 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_find_name_response(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "FindNameMessage timed out!");
|
||||
log::info(link_cat, "FindNameMessage failed!");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1031,7 +1077,7 @@ namespace llarp
|
|||
"publish_intro",
|
||||
PublishIntroMessage::serialize(introset, relay_order, is_relayed),
|
||||
[respond = std::move(respond)](oxen::quic::message m) {
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
return; // drop if timed out; requester will have timed out as well
|
||||
respond(m.body_str());
|
||||
});
|
||||
|
@ -1069,7 +1115,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_publish_intro_response(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "PublishIntroMessage timed out!");
|
||||
return;
|
||||
|
@ -1172,7 +1218,7 @@ namespace llarp
|
|||
link_cat,
|
||||
"Relayed FindIntroMessage returned successful response; transmitting to initial "
|
||||
"requester");
|
||||
else if (relay_response.timed_out)
|
||||
else if (not relay_response)
|
||||
log::critical(
|
||||
link_cat, "Relayed FindIntroMessage timed out! Notifying initial requester");
|
||||
else
|
||||
|
@ -1199,7 +1245,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_find_intro_response(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "FindIntroMessage timed out!");
|
||||
return;
|
||||
|
@ -1392,7 +1438,7 @@ namespace llarp
|
|||
"then relaying response");
|
||||
_router.path_context().put_transit_hop(hop);
|
||||
}
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
log::info(link_cat, "Upstream timed out on path build; relaying timeout");
|
||||
else
|
||||
log::info(link_cat, "Upstream returned path build failure; relaying response");
|
||||
|
@ -1509,7 +1555,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_obtain_exit_response(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "ObtainExitMessage timed out!");
|
||||
return;
|
||||
|
@ -1587,7 +1633,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_update_exit_response(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "UpdateExitMessage timed out!");
|
||||
return;
|
||||
|
@ -1672,7 +1718,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_close_exit_response(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "CloseExitMessage timed out!");
|
||||
return;
|
||||
|
@ -1824,7 +1870,7 @@ namespace llarp
|
|||
void
|
||||
LinkManager::handle_convo_intro(oxen::quic::message m)
|
||||
{
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(link_cat, "Path control message timed out!");
|
||||
return;
|
||||
|
|
|
@ -48,9 +48,12 @@ namespace llarp
|
|||
|
||||
// for outgoing packets, we route via RouterID; map RouterID->Connection
|
||||
// for incoming packets, we get a ConnectionID; map ConnectionID->RouterID
|
||||
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> conns;
|
||||
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> active_conns;
|
||||
std::unordered_map<oxen::quic::ConnectionID, RouterID> connid_map;
|
||||
|
||||
// for pending connections, cleared in LinkManager::on_conn_open
|
||||
std::unordered_map<RouterID, std::shared_ptr<link::Connection>> pending_conns;
|
||||
|
||||
// TODO: see which of these is actually useful and delete the other
|
||||
std::shared_ptr<link::Connection>
|
||||
get_conn(const RemoteRC&) const;
|
||||
|
@ -109,27 +112,18 @@ namespace llarp
|
|||
struct PendingMessage
|
||||
{
|
||||
std::string body;
|
||||
std::optional<std::string> endpoint = std::nullopt;
|
||||
std::function<void(oxen::quic::message)> func = nullptr;
|
||||
|
||||
RouterID rid;
|
||||
bool is_control{false};
|
||||
bool is_control = false;
|
||||
|
||||
PendingMessage(std::string b, bool control = false) : body{std::move(b)}, is_control{control}
|
||||
PendingMessage(std::string b) : body{std::move(b)}
|
||||
{}
|
||||
};
|
||||
|
||||
struct PendingDataMessage : PendingMessage
|
||||
{
|
||||
PendingDataMessage(std::string b) : PendingMessage(b)
|
||||
{}
|
||||
};
|
||||
|
||||
struct PendingControlMessage : PendingMessage
|
||||
{
|
||||
std::string endpoint;
|
||||
std::function<void(oxen::quic::message)> func;
|
||||
|
||||
PendingControlMessage(
|
||||
std::string b, std::string e, std::function<void(oxen::quic::message)> f = nullptr)
|
||||
: PendingMessage(b, true), endpoint{std::move(e)}, func{std::move(f)}
|
||||
PendingMessage(
|
||||
std::string b, std::string ep, std::function<void(oxen::quic::message)> f = nullptr)
|
||||
: body{std::move(b)}, endpoint{std::move(ep)}, func{std::move(f)}, is_control{true}
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -404,16 +398,14 @@ namespace llarp
|
|||
{
|
||||
try
|
||||
{
|
||||
log::critical(logcat, "Establishing connection to {}", remote);
|
||||
const auto& rid = rc.router_id();
|
||||
log::critical(logcat, "Establishing connection to RID:{}", rid);
|
||||
|
||||
auto conn_interface =
|
||||
endpoint->connect(remote, link_manager.tls_creds, std::forward<Opt>(opts)...);
|
||||
|
||||
// emplace immediately for connection open callback to find scid
|
||||
connid_map.emplace(conn_interface->scid(), rc.router_id());
|
||||
auto [itr, b] = conns.emplace(rc.router_id(), nullptr);
|
||||
|
||||
log::critical(logcat, "Establishing connection to {}...", rc.router_id());
|
||||
// add to pending conns
|
||||
auto [itr, b] = pending_conns.emplace(rid, nullptr);
|
||||
|
||||
auto control_stream = conn_interface->template get_new_stream<oxen::quic::BTRequestStream>(
|
||||
[](oxen::quic::Stream& s, uint64_t error_code) {
|
||||
|
@ -427,6 +419,7 @@ namespace llarp
|
|||
link_manager.register_commands(control_stream);
|
||||
itr->second = std::make_shared<link::Connection>(conn_interface, control_stream);
|
||||
|
||||
log::critical(logcat, "Connection to RID:{} added to pending connections...", rid);
|
||||
return true;
|
||||
}
|
||||
catch (...)
|
||||
|
|
|
@ -379,16 +379,16 @@ namespace llarp
|
|||
src,
|
||||
FetchRCMessage::serialize(_router.last_rc_fetch, needed),
|
||||
[this, src, initial](oxen::quic::message m) mutable {
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::info(logcat, "RC fetch to {} timed out", src);
|
||||
log::info(logcat, "RC fetch to {} failed!", src);
|
||||
fetch_rcs_result(initial, true);
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
oxenc::bt_dict_consumer btdc{m.body()};
|
||||
|
||||
// TODO: fix this shit after removing ::timed_out from message type
|
||||
if (not m)
|
||||
{
|
||||
auto reason = btdc.require<std::string_view>(messages::STATUS_KEY);
|
||||
|
@ -668,13 +668,14 @@ namespace llarp
|
|||
_needs_rebootstrap = false;
|
||||
++bootstrap_attempts;
|
||||
|
||||
log::critical(
|
||||
logcat, "Dispatching BootstrapRC fetch request to {}", _bootstraps.current().view());
|
||||
log::critical(logcat, "Dispatching BootstrapRC fetch request to {}", fetch_source);
|
||||
|
||||
_router.link_manager().fetch_bootstrap_rcs(
|
||||
rc,
|
||||
BootstrapFetchMessage::serialize(_router.router_contact, BOOTSTRAP_SOURCE_COUNT),
|
||||
[this](oxen::quic::message m) mutable {
|
||||
log::critical(logcat, "Received response to BootstrapRC fetch request...");
|
||||
|
||||
if (not m)
|
||||
{
|
||||
// ++bootstrap_attempts;
|
||||
|
@ -688,7 +689,8 @@ namespace llarp
|
|||
return;
|
||||
}
|
||||
|
||||
std::set<RouterID> rids;
|
||||
// std::set<RouterID> rids;
|
||||
size_t num = 0;
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -700,7 +702,8 @@ namespace llarp
|
|||
while (not btlc.is_finished())
|
||||
{
|
||||
auto rc = RemoteRC{btlc.consume_dict_data()};
|
||||
rids.emplace(rc.router_id());
|
||||
put_rc(rc);
|
||||
++num;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -723,23 +726,30 @@ namespace llarp
|
|||
// next call to fallback_to_bootstrap() and hit the base case, rotating sources
|
||||
// bootstrap_attempts = MAX_BOOTSTRAP_FETCH_ATTEMPTS;
|
||||
|
||||
if (rids.size() == BOOTSTRAP_SOURCE_COUNT)
|
||||
{
|
||||
known_rids.merge(rids);
|
||||
fetch_initial();
|
||||
}
|
||||
else
|
||||
{
|
||||
// ++bootstrap_attempts;
|
||||
log::warning(
|
||||
logcat,
|
||||
"BootstrapRC fetch response from {} returned insufficient number of RC's (error "
|
||||
"{}/{})",
|
||||
fetch_source,
|
||||
bootstrap_attempts,
|
||||
MAX_BOOTSTRAP_FETCH_ATTEMPTS);
|
||||
fallback_to_bootstrap();
|
||||
}
|
||||
// const auto& num = rids.size();
|
||||
|
||||
log::critical(logcat, "BootstrapRC fetch response from {} returned {}/{} needed RCs", fetch_source, num, BOOTSTRAP_SOURCE_COUNT);
|
||||
// known_rids.merge(rids);
|
||||
fetch_initial();
|
||||
|
||||
// FIXME: when moving to testnet, uncomment this
|
||||
// if (rids.size() == BOOTSTRAP_SOURCE_COUNT)
|
||||
// {
|
||||
// known_rids.merge(rids);
|
||||
// fetch_initial();
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// // ++bootstrap_attempts;
|
||||
// log::warning(
|
||||
// logcat,
|
||||
// "BootstrapRC fetch response from {} returned insufficient number of RC's (error "
|
||||
// "{}/{})",
|
||||
// fetch_source,
|
||||
// bootstrap_attempts,
|
||||
// MAX_BOOTSTRAP_FETCH_ATTEMPTS);
|
||||
// fallback_to_bootstrap();
|
||||
// }
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -954,9 +964,6 @@ namespace llarp
|
|||
{
|
||||
const auto& rid = rc.router_id();
|
||||
|
||||
if (not want_rc(rid))
|
||||
return false;
|
||||
|
||||
known_rcs.erase(rc);
|
||||
rc_lookup.erase(rid);
|
||||
|
||||
|
@ -974,6 +981,12 @@ namespace llarp
|
|||
return known_rcs.size();
|
||||
}
|
||||
|
||||
size_t
|
||||
NodeDB::num_rids() const
|
||||
{
|
||||
return known_rids.size();
|
||||
}
|
||||
|
||||
bool
|
||||
NodeDB::put_rc_if_newer(RemoteRC rc, rc_time now)
|
||||
{
|
||||
|
|
|
@ -403,6 +403,9 @@ namespace llarp
|
|||
size_t
|
||||
num_rcs() const;
|
||||
|
||||
size_t
|
||||
num_rids() const;
|
||||
|
||||
/// do periodic tasks like flush to disk and expiration
|
||||
void
|
||||
Tick(llarp_time_t now);
|
||||
|
|
|
@ -118,7 +118,7 @@ namespace llarp::path
|
|||
if ((not self) or (not response_cb))
|
||||
return;
|
||||
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
response_cb(messages::TIMEOUT_RESPONSE);
|
||||
return;
|
||||
|
|
|
@ -513,9 +513,9 @@ namespace llarp
|
|||
path->EnterState(path::ePathEstablished);
|
||||
return;
|
||||
}
|
||||
if (m.timed_out)
|
||||
if (not m)
|
||||
{
|
||||
log::warning(path_cat, "Path build timed out");
|
||||
log::warning(path_cat, "Path build request failed!");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -218,9 +218,8 @@ namespace llarp
|
|||
|
||||
std::unordered_set<RouterID> peer_pubkeys;
|
||||
|
||||
for_each_connection([&peer_pubkeys](link::Connection& conn) {
|
||||
peer_pubkeys.emplace(conn.conn->remote_key());
|
||||
});
|
||||
for_each_connection(
|
||||
[&peer_pubkeys](link::Connection& conn) { peer_pubkeys.emplace(conn.conn->remote_key()); });
|
||||
|
||||
loop()->call([this, &peer_pubkeys]() {
|
||||
for (auto& pk : peer_pubkeys)
|
||||
|
@ -769,8 +768,9 @@ namespace llarp
|
|||
|
||||
log::critical(
|
||||
logcat,
|
||||
"{} RCs loaded with {} bootstrap peers and {} router connections!",
|
||||
"{} RCs loaded with {} RIDs, {} bootstrap peers, and {} router connections!",
|
||||
_node_db->num_rcs(),
|
||||
_node_db->num_rids(),
|
||||
_node_db->num_bootstraps(),
|
||||
num_router_connections());
|
||||
|
||||
|
@ -1139,7 +1139,7 @@ namespace llarp
|
|||
|
||||
log::info(logcat, "Loading NodeDB from disk...");
|
||||
_node_db->load_from_disk();
|
||||
_node_db->store_bootstraps();
|
||||
// _node_db->store_bootstraps();
|
||||
|
||||
oxen::log::flush();
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ namespace llarp::rpc
|
|||
return; // bail
|
||||
}
|
||||
|
||||
LogDebug("new block at height ", m_BlockHeight);
|
||||
log::trace(logcat, "new block at height {}", m_BlockHeight);
|
||||
// don't upadate on block notification if an update is pending
|
||||
if (not m_UpdatingList)
|
||||
UpdateServiceNodeList();
|
||||
|
@ -138,7 +138,7 @@ namespace llarp::rpc
|
|||
throw std::runtime_error{"get_service_nodes did not return 'OK' status"};
|
||||
if (auto it = json.find("unchanged");
|
||||
it != json.end() and it->is_boolean() and it->get<bool>())
|
||||
LogDebug("service node list unchanged");
|
||||
log::trace(logcat, "service node list unchanged");
|
||||
else
|
||||
{
|
||||
self->HandleNewServiceNodeList(json.at("service_node_states"));
|
||||
|
|
Loading…
Reference in New Issue