Fixed pending message queue weirdness

This commit is contained in:
dr7ana 2023-12-13 05:49:59 -08:00
parent fbc71847ef
commit d016951d2f
5 changed files with 32 additions and 37 deletions

View File

@ -5,7 +5,7 @@ namespace llarp::link
Connection::Connection(
const std::shared_ptr<oxen::quic::connection_interface>& c,
std::shared_ptr<oxen::quic::BTRequestStream>& s)
: conn{c}, control_stream{s}/* , remote_rc{std::move(rc)} */
: conn{c}, control_stream{s} /* , remote_rc{std::move(rc)} */
{}
} // namespace llarp::link

View File

@ -74,7 +74,7 @@ namespace llarp
std::advance(itr, randint() % size);
RouterID rid{itr->second->conn->remote_key()};
if (auto maybe = link_manager.node_db->get_rc(rid))
{
router = *maybe;
@ -151,7 +151,8 @@ namespace llarp
for (auto& method : direct_requests)
{
s->register_command(
std::string{method.first}, [this, func = std::move(method.second)](oxen::quic::message m) {
std::string{method.first},
[this, func = std::move(method.second)](oxen::quic::message m) {
_router.loop()->call([this, msg = std::move(m), func = std::move(func)]() mutable {
auto body = msg.body_str();
auto respond = [m = std::move(msg)](std::string response) mutable {
@ -291,7 +292,7 @@ namespace llarp
endpoint = std::move(endpoint),
body = std::move(body),
f = std::move(func)]() {
auto pending = PendingControlMessage(std::move(body), std::move(endpoint), f);
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
@ -315,7 +316,7 @@ namespace llarp
}
_router.loop()->call([this, body = std::move(body), remote]() {
auto pending = PendingDataMessage(body);
auto pending = PendingMessage(std::move(body));
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
@ -434,17 +435,16 @@ namespace llarp
while (not que.empty())
{
auto& m = que.front();
auto& msg = que.front();
if (m.is_control)
if (msg.is_control)
{
auto& msg = reinterpret_cast<PendingControlMessage&>(m);
log::critical(logcat, "Dispatching {} request!", msg.endpoint);
ep.conns[rid]->control_stream->command(msg.endpoint, msg.body, msg.func);
log::critical(logcat, "Dispatching {} request!", *msg.endpoint);
ep.conns[rid]->control_stream->command(
std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
auto& msg = reinterpret_cast<PendingDataMessage&>(m);
conn_interface.send_datagram(std::move(msg.body));
}
@ -647,7 +647,7 @@ namespace llarp
}
log::critical(logcat, "Queuing bootstrap fetch request to {}", source.router_id());
auto pending = PendingControlMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
auto [itr, b] = pending_conn_msg_queue.emplace(source.router_id(), MessageQueue());
itr->second.push_back(std::move(pending));
@ -688,12 +688,16 @@ namespace llarp
// TODO: if we are not the seed, how do we check the requester
if (is_seed)
{
// we already insert the
// we already insert the
auto& seeds = node_db->seeds();
if (auto itr = seeds.find(rid); itr != seeds.end())
{
log::critical(logcat, "Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and saving RC!", rid);
log::critical(
logcat,
"Bootstrap seed confirmed RID:{} is white-listed seeds; approving fetch request and "
"saving RC!",
rid);
node_db->put_rc(remote);
}
}

View File

@ -109,27 +109,18 @@ namespace llarp
struct PendingMessage
{
std::string body;
std::optional<std::string> endpoint = std::nullopt;
std::function<void(oxen::quic::message)> func = nullptr;
RouterID rid;
bool is_control{false};
bool is_control = false;
PendingMessage(std::string b, bool control = false) : body{std::move(b)}, is_control{control}
PendingMessage(std::string b) : body{std::move(b)}
{}
};
struct PendingDataMessage : PendingMessage
{
PendingDataMessage(std::string b) : PendingMessage(b)
{}
};
struct PendingControlMessage : PendingMessage
{
std::string endpoint;
std::function<void(oxen::quic::message)> func;
PendingControlMessage(
std::string b, std::string e, std::function<void(oxen::quic::message)> f = nullptr)
: PendingMessage(b, true), endpoint{std::move(e)}, func{std::move(f)}
PendingMessage(
std::string b, std::string ep, std::function<void(oxen::quic::message)> f = nullptr)
: body{std::move(b)}, endpoint{std::move(ep)}, func{std::move(f)}, is_control{true}
{}
};

View File

@ -668,13 +668,14 @@ namespace llarp
_needs_rebootstrap = false;
++bootstrap_attempts;
log::critical(
logcat, "Dispatching BootstrapRC fetch request to {}", _bootstraps.current().view());
log::critical(logcat, "Dispatching BootstrapRC fetch request to {}", fetch_source);
_router.link_manager().fetch_bootstrap_rcs(
rc,
BootstrapFetchMessage::serialize(_router.router_contact, BOOTSTRAP_SOURCE_COUNT),
[this](oxen::quic::message m) mutable {
log::critical(logcat, "Received response to BootstrapRC fetch request...");
if (not m)
{
// ++bootstrap_attempts;

View File

@ -218,9 +218,8 @@ namespace llarp
std::unordered_set<RouterID> peer_pubkeys;
for_each_connection([&peer_pubkeys](link::Connection& conn) {
peer_pubkeys.emplace(conn.conn->remote_key());
});
for_each_connection(
[&peer_pubkeys](link::Connection& conn) { peer_pubkeys.emplace(conn.conn->remote_key()); });
loop()->call([this, &peer_pubkeys]() {
for (auto& pk : peer_pubkeys)