mirror of
https://github.com/oxen-io/lokinet
synced 2023-12-14 06:53:00 +01:00
less synchronous for the subsequent fetches
This commit is contained in:
parent
6559617816
commit
3fc7980691
|
@ -457,6 +457,14 @@ namespace llarp
|
||||||
|
|
||||||
auto since_time = rc_time{std::chrono::seconds{btdc.require<int64_t>("since")}};
|
auto since_time = rc_time{std::chrono::seconds{btdc.require<int64_t>("since")}};
|
||||||
|
|
||||||
|
std::unordered_set<RouterID> explicit_relays;
|
||||||
|
|
||||||
|
// Initial fetch: give me all the RC's
|
||||||
|
if (explicit_ids.empty())
|
||||||
|
{
|
||||||
|
// TODO: this
|
||||||
|
}
|
||||||
|
|
||||||
if (explicit_ids.size() > (rcs.size() / 4))
|
if (explicit_ids.size() > (rcs.size() / 4))
|
||||||
{
|
{
|
||||||
log::info(
|
log::info(
|
||||||
|
@ -465,8 +473,6 @@ namespace llarp
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unordered_set<RouterID> explicit_relays;
|
|
||||||
|
|
||||||
for (auto& sv : explicit_ids)
|
for (auto& sv : explicit_ids)
|
||||||
{
|
{
|
||||||
if (sv.size() != RouterID::SIZE)
|
if (sv.size() != RouterID::SIZE)
|
||||||
|
@ -478,10 +484,10 @@ namespace llarp
|
||||||
explicit_relays.emplace(reinterpret_cast<const byte_t*>(sv.data()));
|
explicit_relays.emplace(reinterpret_cast<const byte_t*>(sv.data()));
|
||||||
}
|
}
|
||||||
|
|
||||||
oxenc::bt_dict_producer resp;
|
oxenc::bt_dict_producer btdp;
|
||||||
|
|
||||||
{
|
{
|
||||||
auto rc_bt_list = resp.append_list("rcs");
|
auto rc_sublist = btdp.append_list("rcs");
|
||||||
|
|
||||||
const auto& last_time = node_db->get_last_rc_update_times();
|
const auto& last_time = node_db->get_last_rc_update_times();
|
||||||
|
|
||||||
|
@ -492,13 +498,13 @@ namespace llarp
|
||||||
for (const auto& [_, rc] : rcs)
|
for (const auto& [_, rc] : rcs)
|
||||||
{
|
{
|
||||||
if (last_time.at(rc.router_id()) > since_time or explicit_relays.count(rc.router_id()))
|
if (last_time.at(rc.router_id()) > since_time or explicit_relays.count(rc.router_id()))
|
||||||
rc_bt_list.append_encoded(rc.view());
|
rc_sublist.append_encoded(rc.view());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.append("time", now.time_since_epoch().count());
|
btdp.append("time", now.time_since_epoch().count());
|
||||||
|
|
||||||
m.respond(std::move(resp).str());
|
m.respond(std::move(btdp).str());
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
{
|
{
|
||||||
|
|
416
llarp/nodedb.cpp
416
llarp/nodedb.cpp
|
@ -125,12 +125,12 @@ namespace llarp
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
RouterID temp = rc_fetch_source;
|
RouterID temp = fetch_source;
|
||||||
|
|
||||||
while (temp == rc_fetch_source)
|
while (temp == fetch_source)
|
||||||
std::sample(active_client_routers.begin(), active_client_routers.end(), &temp, 1, csrng);
|
std::sample(active_client_routers.begin(), active_client_routers.end(), &temp, 1, csrng);
|
||||||
|
|
||||||
rc_fetch_source = std::move(temp);
|
fetch_source = std::move(temp);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,56 +156,55 @@ namespace llarp
|
||||||
if (conn_count == 1)
|
if (conn_count == 1)
|
||||||
{
|
{
|
||||||
// if we only have one connection, it must be current rc fetch source
|
// if we only have one connection, it must be current rc fetch source
|
||||||
assert(new_source.router_id() == rc_fetch_source);
|
assert(new_source.router_id() == fetch_source);
|
||||||
|
|
||||||
if (pinned_edges.size() == 1)
|
if (pinned_edges.size() == 1)
|
||||||
{
|
{
|
||||||
// only one pinned edge set, use it even though it gave unsatisfactory RCs
|
// only one pinned edge set, use it even though it gave unsatisfactory RCs
|
||||||
assert(rc_fetch_source == *(pinned_edges.begin()));
|
assert(fetch_source == *(pinned_edges.begin()));
|
||||||
log::warning(
|
log::warning(
|
||||||
logcat,
|
logcat,
|
||||||
"Single pinned edge {} gave bad RC response; still using it despite this.",
|
"Single pinned edge {} gave bad RC response; still using it despite this.",
|
||||||
rc_fetch_source);
|
fetch_source);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// only one connection, choose a new relay to connect to for rc fetching
|
// only one connection, choose a new relay to connect to for rc fetching
|
||||||
RouterID r = rc_fetch_source;
|
RouterID r = fetch_source;
|
||||||
|
|
||||||
while (r == rc_fetch_source)
|
while (r == fetch_source)
|
||||||
{
|
{
|
||||||
std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng);
|
std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng);
|
||||||
}
|
}
|
||||||
rc_fetch_source = std::move(r);
|
fetch_source = std::move(r);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// choose one of our other existing connections to use as the RC fetch source
|
// choose one of our other existing connections to use as the RC fetch source
|
||||||
while (new_source.router_id() == rc_fetch_source)
|
while (new_source.router_id() == fetch_source)
|
||||||
{
|
{
|
||||||
_router.link_manager().get_random_connected(new_source);
|
_router.link_manager().get_random_connected(new_source);
|
||||||
}
|
}
|
||||||
rc_fetch_source = new_source.router_id();
|
|
||||||
|
fetch_source = new_source.router_id();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: trust model
|
// TODO: trust model
|
||||||
void
|
bool
|
||||||
NodeDB::store_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp)
|
NodeDB::process_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp)
|
||||||
{
|
{
|
||||||
(void)source;
|
fetch_source = source;
|
||||||
|
|
||||||
// TODO: if we don't currently have a "trusted" relay we've been fetching from,
|
/*
|
||||||
// this will be a full list of RCs. We need to first check if it aligns closely
|
TODO: trust model analyzing returned list of RCs
|
||||||
// with our trusted RouterID list, then replace our RCs with the incoming set.
|
*/
|
||||||
|
|
||||||
for (auto& rc : rcs)
|
for (auto& rc : rcs)
|
||||||
put_rc_if_newer(std::move(rc), timestamp);
|
put_rc_if_newer(std::move(rc), timestamp);
|
||||||
|
|
||||||
// TODO: if we have a "trusted" relay we've been fetching from, this will be
|
|
||||||
// an incremental update to the RC list, so *after* insertion we check if the
|
|
||||||
// RCs' RouterIDs closely match our trusted RouterID list.
|
|
||||||
|
|
||||||
last_rc_update_relay_timestamp = timestamp;
|
last_rc_update_relay_timestamp = timestamp;
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -213,13 +212,14 @@ namespace llarp
|
||||||
{
|
{
|
||||||
const auto& rid = source.router_id();
|
const auto& rid = source.router_id();
|
||||||
|
|
||||||
router_id_fetch_responses[rid] = std::move(ids);
|
fetch_rid_responses[rid] = std::move(ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: trust model
|
||||||
bool
|
bool
|
||||||
NodeDB::process_fetched_rids()
|
NodeDB::process_fetched_rids()
|
||||||
{
|
{
|
||||||
for (const auto& [rid, responses] : router_id_fetch_responses)
|
for (const auto& [rid, responses] : fetch_rid_responses)
|
||||||
{
|
{
|
||||||
// TODO: empty == failure, handle that case
|
// TODO: empty == failure, handle that case
|
||||||
for (const auto& response : responses)
|
for (const auto& response : responses)
|
||||||
|
@ -227,15 +227,36 @@ namespace llarp
|
||||||
active_client_routers.insert(std::move(response));
|
active_client_routers.insert(std::move(response));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
router_id_fetch_in_progress = false;
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
NodeDB::fetch_rcs(int n_fails, bool initial)
|
NodeDB::fetch_initial()
|
||||||
|
{
|
||||||
|
int num_fails = 0;
|
||||||
|
|
||||||
|
// fetch_initial_{rcs,router_ids} return false when num_fails == 0
|
||||||
|
if (fetch_initial_rcs(num_fails))
|
||||||
|
{
|
||||||
|
_router.last_rc_fetch = llarp::time_point_now();
|
||||||
|
|
||||||
|
if (fetch_initial_router_ids(num_fails))
|
||||||
|
{
|
||||||
|
_router.last_rid_fetch = llarp::time_point_now();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// failure case
|
||||||
|
// TODO: use bootstrap here!
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
NodeDB::fetch_initial_rcs(int n_fails)
|
||||||
{
|
{
|
||||||
int num_failures = n_fails;
|
int num_failures = n_fails;
|
||||||
|
is_fetching_rcs = true;
|
||||||
std::vector<RouterID> needed;
|
std::vector<RouterID> needed;
|
||||||
const auto now = time_point_now();
|
const auto now = time_point_now();
|
||||||
|
|
||||||
|
@ -245,9 +266,8 @@ namespace llarp
|
||||||
needed.push_back(rid);
|
needed.push_back(rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
RouterID src = (initial)
|
RouterID src =
|
||||||
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
|
*std::next(active_client_routers.begin(), csrng() % active_client_routers.size());
|
||||||
: rc_fetch_source;
|
|
||||||
|
|
||||||
while (num_failures < MAX_FETCH_ATTEMPTS)
|
while (num_failures < MAX_FETCH_ATTEMPTS)
|
||||||
{
|
{
|
||||||
|
@ -285,7 +305,7 @@ namespace llarp
|
||||||
rcs.emplace_back(btlc.consume_dict_consumer());
|
rcs.emplace_back(btlc.consume_dict_consumer());
|
||||||
}
|
}
|
||||||
|
|
||||||
store_fetched_rcs(src, std::move(rcs), timestamp);
|
process_fetched_rcs(src, std::move(rcs), timestamp);
|
||||||
p->set_value(true);
|
p->set_value(true);
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
|
@ -299,9 +319,8 @@ namespace llarp
|
||||||
if (f.get())
|
if (f.get())
|
||||||
{
|
{
|
||||||
log::debug(logcat, "Successfully fetched RC's from {}", src);
|
log::debug(logcat, "Successfully fetched RC's from {}", src);
|
||||||
rc_fetch_source = src;
|
fetch_source = src;
|
||||||
assert(_router.link_manager().have_connection_to(src));
|
return true;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
++num_failures;
|
++num_failures;
|
||||||
|
@ -312,34 +331,25 @@ namespace llarp
|
||||||
num_failures,
|
num_failures,
|
||||||
MAX_FETCH_ATTEMPTS);
|
MAX_FETCH_ATTEMPTS);
|
||||||
|
|
||||||
src = (initial)
|
src = *std::next(active_client_routers.begin(), csrng() % active_client_routers.size());
|
||||||
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
|
|
||||||
: std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
bool
|
||||||
NodeDB::fetch_router_ids(int n_fails, bool initial)
|
NodeDB::fetch_initial_router_ids(int n_fails)
|
||||||
{
|
{
|
||||||
assert(not router_id_fetch_in_progress);
|
assert(not is_fetching_rids);
|
||||||
|
|
||||||
int num_failures = n_fails;
|
int num_failures = n_fails;
|
||||||
|
select_router_id_sources();
|
||||||
|
|
||||||
if (router_id_fetch_sources.empty())
|
is_fetching_rids = true;
|
||||||
select_router_id_sources();
|
fetch_rid_responses.clear();
|
||||||
|
|
||||||
// if we *still* don't have fetch sources, we can't exactly fetch...
|
RouterID src =
|
||||||
if (router_id_fetch_sources.empty())
|
*std::next(active_client_routers.begin(), csrng() % active_client_routers.size());
|
||||||
{
|
|
||||||
log::info(logcat, "Attempting to fetch RouterIDs, but have no source from which to do so.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
router_id_fetch_in_progress = true;
|
|
||||||
router_id_fetch_responses.clear();
|
|
||||||
|
|
||||||
RouterID src = (initial)
|
|
||||||
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
|
|
||||||
: rc_fetch_source;
|
|
||||||
|
|
||||||
std::unordered_set<RouterID> fails;
|
std::unordered_set<RouterID> fails;
|
||||||
|
|
||||||
|
@ -350,7 +360,7 @@ namespace llarp
|
||||||
auto f = success->get_future();
|
auto f = success->get_future();
|
||||||
fails.clear();
|
fails.clear();
|
||||||
|
|
||||||
for (const auto& target : router_id_fetch_sources)
|
for (const auto& target : rid_sources)
|
||||||
{
|
{
|
||||||
_router.link_manager().fetch_router_ids(
|
_router.link_manager().fetch_router_ids(
|
||||||
src,
|
src,
|
||||||
|
@ -420,11 +430,9 @@ namespace llarp
|
||||||
default:
|
default:
|
||||||
// RC node failed to relay our routerID request; re-select RC node and continue
|
// RC node failed to relay our routerID request; re-select RC node and continue
|
||||||
log::debug(logcat, "RC source {} failed to mediate RID fetching from {}", src, target);
|
log::debug(logcat, "RC source {} failed to mediate RID fetching from {}", src, target);
|
||||||
src = (initial)
|
src = *std::next(active_client_routers.begin(), csrng() % active_client_routers.size());
|
||||||
? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size())
|
|
||||||
: std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
|
|
||||||
++num_failures;
|
++num_failures;
|
||||||
fetch_rcs(num_failures);
|
fetch_rcs();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,19 +446,18 @@ namespace llarp
|
||||||
"RID fetching was successful ({}/{} acceptable errors)",
|
"RID fetching was successful ({}/{} acceptable errors)",
|
||||||
fails.size(),
|
fails.size(),
|
||||||
MAX_RID_ERRORS);
|
MAX_RID_ERRORS);
|
||||||
rc_fetch_source = src;
|
fetch_source = src;
|
||||||
assert(_router.link_manager().have_connection_to(src));
|
|
||||||
|
|
||||||
// this is where the trust model will do verification based on the similarity of the sets
|
// this is where the trust model will do verification based on the similarity of the sets
|
||||||
if (process_fetched_rids())
|
if (process_fetched_rids())
|
||||||
{
|
{
|
||||||
log::debug(logcat, "Accumulated RID's accepted by trust model");
|
log::debug(logcat, "Accumulated RID's accepted by trust model");
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
log::debug(
|
log::debug(
|
||||||
logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources...");
|
logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources...");
|
||||||
select_router_id_sources(router_id_fetch_sources);
|
select_router_id_sources(rid_sources);
|
||||||
++num_failures;
|
++num_failures;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -461,6 +468,271 @@ namespace llarp
|
||||||
++num_failures;
|
++num_failures;
|
||||||
select_router_id_sources(fails);
|
select_router_id_sources(fails);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NodeDB::fetch_rcs()
|
||||||
|
{
|
||||||
|
auto& num_failures = fetch_failures;
|
||||||
|
|
||||||
|
// base case; this function is called recursively
|
||||||
|
if (num_failures > MAX_FETCH_ATTEMPTS)
|
||||||
|
{
|
||||||
|
fetch_rcs_result(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
is_fetching_rcs = true;
|
||||||
|
|
||||||
|
std::vector<RouterID> needed;
|
||||||
|
const auto now = time_point_now();
|
||||||
|
|
||||||
|
for (const auto& [rid, rc] : known_rcs)
|
||||||
|
{
|
||||||
|
if (now - rc.timestamp() > RouterContact::OUTDATED_AGE)
|
||||||
|
needed.push_back(rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
RouterID& src = fetch_source;
|
||||||
|
|
||||||
|
_router.link_manager().fetch_rcs(
|
||||||
|
src,
|
||||||
|
RCFetchMessage::serialize(last_rc_update_relay_timestamp, needed),
|
||||||
|
[this, src](oxen::quic::message m) mutable {
|
||||||
|
if (m.timed_out)
|
||||||
|
{
|
||||||
|
log::info(logcat, "RC fetch to {} timed out", src);
|
||||||
|
fetch_rcs_result(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try
|
||||||
|
{
|
||||||
|
oxenc::bt_dict_consumer btdc{m.body()};
|
||||||
|
if (not m)
|
||||||
|
{
|
||||||
|
auto reason = btdc.require<std::string_view>(messages::STATUS_KEY);
|
||||||
|
log::info(logcat, "RC fetch to {} returned error: {}", src, reason);
|
||||||
|
fetch_rcs_result(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto btlc = btdc.require<oxenc::bt_list_consumer>("rcs"sv);
|
||||||
|
auto timestamp = rc_time{std::chrono::seconds{btdc.require<int64_t>("time"sv)}};
|
||||||
|
|
||||||
|
std::vector<RemoteRC> rcs;
|
||||||
|
|
||||||
|
while (not btlc.is_finished())
|
||||||
|
{
|
||||||
|
rcs.emplace_back(btlc.consume_dict_consumer());
|
||||||
|
}
|
||||||
|
|
||||||
|
// if process_fetched_rcs returns false, then the trust model rejected the fetched RC's
|
||||||
|
fetch_rcs_result(not process_fetched_rcs(src, std::move(rcs), timestamp));
|
||||||
|
}
|
||||||
|
catch (const std::exception& e)
|
||||||
|
{
|
||||||
|
log::info(logcat, "Failed to parse RC fetch response from {}: {}", src, e.what());
|
||||||
|
fetch_rcs_result(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NodeDB::fetch_rcs_result(bool error)
|
||||||
|
{
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
++fetch_failures;
|
||||||
|
|
||||||
|
if (fetch_failures > MAX_FETCH_ATTEMPTS)
|
||||||
|
{
|
||||||
|
log::info(
|
||||||
|
logcat,
|
||||||
|
"Failed {} attempts to fetch RC's from {}; reverting to bootstrap...",
|
||||||
|
MAX_FETCH_ATTEMPTS,
|
||||||
|
fetch_source);
|
||||||
|
// TODO: revert to bootstrap
|
||||||
|
// set rc_fetch_source to bootstrap and try again!
|
||||||
|
}
|
||||||
|
else
|
||||||
|
// find new non-bootstrap RC fetch source and try again buddy
|
||||||
|
fetch_source = std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
|
||||||
|
|
||||||
|
fetch_rcs();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
log::debug(logcat, "Successfully fetched RC's from {}", fetch_source);
|
||||||
|
post_fetch_rcs();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NodeDB::post_fetch_rcs()
|
||||||
|
{
|
||||||
|
is_fetching_rcs = false;
|
||||||
|
_router.last_rc_fetch = llarp::time_point_now();
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: differentiate between errors from the relay node vs errors from the target nodes
|
||||||
|
void
|
||||||
|
NodeDB::fetch_router_ids()
|
||||||
|
{
|
||||||
|
auto& num_failures = fetch_failures;
|
||||||
|
|
||||||
|
// base case; this function is called recursively
|
||||||
|
if (num_failures > MAX_FETCH_ATTEMPTS)
|
||||||
|
{
|
||||||
|
fetch_rids_result(fetch_source, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rid_sources.empty())
|
||||||
|
select_router_id_sources();
|
||||||
|
|
||||||
|
// if we *still* don't have fetch sources, we can't exactly fetch...
|
||||||
|
if (rid_sources.empty())
|
||||||
|
{
|
||||||
|
log::error(logcat, "Attempting to fetch RouterIDs, but have no source from which to do so.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
is_fetching_rids = true;
|
||||||
|
fetch_rid_responses.clear();
|
||||||
|
|
||||||
|
RouterID& src = fetch_source;
|
||||||
|
RemoteRC& src_rc = known_rcs[src];
|
||||||
|
|
||||||
|
for (const auto& target : rid_sources)
|
||||||
|
{
|
||||||
|
_router.link_manager().fetch_router_ids(
|
||||||
|
src,
|
||||||
|
RouterIDFetch::serialize(target),
|
||||||
|
[this, src, src_rc, target](oxen::quic::message m) mutable {
|
||||||
|
if (not m)
|
||||||
|
{
|
||||||
|
log::info(link_cat, "RID fetch from {} via {} timed out", src, target);
|
||||||
|
|
||||||
|
ingest_rid_fetch_responses(src_rc);
|
||||||
|
fetch_rids_result(src, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
oxenc::bt_dict_consumer btdc{m.body()};
|
||||||
|
|
||||||
|
btdc.required("routers");
|
||||||
|
auto router_id_strings = btdc.consume_list<std::vector<ustring>>();
|
||||||
|
|
||||||
|
btdc.require_signature("signature", [&src](ustring_view msg, ustring_view sig) {
|
||||||
|
if (sig.size() != 64)
|
||||||
|
throw std::runtime_error{"Invalid signature: not 64 bytes"};
|
||||||
|
if (not crypto::verify(src, msg, sig))
|
||||||
|
throw std::runtime_error{
|
||||||
|
"Failed to verify signature for fetch RouterIDs response."};
|
||||||
|
});
|
||||||
|
|
||||||
|
std::vector<RouterID> router_ids;
|
||||||
|
|
||||||
|
for (const auto& s : router_id_strings)
|
||||||
|
{
|
||||||
|
if (s.size() != RouterID::SIZE)
|
||||||
|
{
|
||||||
|
log::warning(
|
||||||
|
link_cat, "RID fetch from {} via {} returned bad RouterID", target, src);
|
||||||
|
ingest_rid_fetch_responses(src_rc);
|
||||||
|
fetch_rids_result(src, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
router_ids.emplace_back(s.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
ingest_rid_fetch_responses(src_rc, std::move(router_ids));
|
||||||
|
fetch_rids_result(src);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (const std::exception& e)
|
||||||
|
{
|
||||||
|
log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what());
|
||||||
|
ingest_rid_fetch_responses(src_rc);
|
||||||
|
fetch_rids_result(src, true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NodeDB::fetch_rids_result(const RouterID& target, bool error)
|
||||||
|
{
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
fail_sources.insert(target);
|
||||||
|
++fetch_failures;
|
||||||
|
|
||||||
|
if (fetch_failures > MAX_FETCH_ATTEMPTS)
|
||||||
|
{
|
||||||
|
log::info(
|
||||||
|
logcat,
|
||||||
|
"Failed {} attempts to fetch RID's from {}; reverting to bootstrap...",
|
||||||
|
MAX_FETCH_ATTEMPTS,
|
||||||
|
fetch_source);
|
||||||
|
// TODO: revert to bootstrap
|
||||||
|
// set rc_fetch_source to bootstrap and START OVER!
|
||||||
|
}
|
||||||
|
else
|
||||||
|
// find new non-bootstrap RC fetch source and try again buddy
|
||||||
|
fetch_source = std::next(known_rcs.begin(), csrng() % known_rcs.size())->first;
|
||||||
|
|
||||||
|
fetch_router_ids();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug(logcat, "Successfully fetched RID's from {}", fetch_source);
|
||||||
|
auto n_fails = fail_sources.size();
|
||||||
|
|
||||||
|
if (n_fails <= MAX_RID_ERRORS)
|
||||||
|
{
|
||||||
|
log::debug(
|
||||||
|
logcat, "RID fetching was successful ({}/{} acceptable errors)", n_fails, MAX_RID_ERRORS);
|
||||||
|
|
||||||
|
// this is where the trust model will do verification based on the similarity of the sets
|
||||||
|
if (process_fetched_rids())
|
||||||
|
{
|
||||||
|
log::debug(logcat, "Accumulated RID's accepted by trust model");
|
||||||
|
post_fetch_rids();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug(
|
||||||
|
logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources...");
|
||||||
|
select_router_id_sources(rid_sources);
|
||||||
|
++fetch_failures;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// we had 4 or more failed requests, so we will need to rotate our rid sources
|
||||||
|
log::debug(
|
||||||
|
logcat, "RID fetching found {} failures; reselecting failed RID sources...", n_fails);
|
||||||
|
++fetch_failures;
|
||||||
|
select_router_id_sources(fail_sources);
|
||||||
|
}
|
||||||
|
|
||||||
|
fetch_router_ids();
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NodeDB::post_fetch_rids()
|
||||||
|
{
|
||||||
|
is_fetching_rids = false;
|
||||||
|
fetch_rid_responses.clear();
|
||||||
|
fail_sources.clear();
|
||||||
|
fetch_failures = 0;
|
||||||
|
_router.last_rid_fetch = llarp::time_point_now();
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -471,9 +743,17 @@ namespace llarp
|
||||||
if (active_client_routers.empty())
|
if (active_client_routers.empty())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// in case we pass the entire list
|
||||||
|
std::unordered_set<RouterID> temp = rid_sources;
|
||||||
|
|
||||||
// keep using any we've been using, but remove `excluded` ones
|
// keep using any we've been using, but remove `excluded` ones
|
||||||
for (const auto& r : excluded)
|
if (excluded == rid_sources)
|
||||||
router_id_fetch_sources.erase(r);
|
temp.clear();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (const auto& r : excluded)
|
||||||
|
temp.erase(r);
|
||||||
|
}
|
||||||
|
|
||||||
// only know so many routers, so no need to randomize
|
// only know so many routers, so no need to randomize
|
||||||
if (active_client_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size()))
|
if (active_client_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size()))
|
||||||
|
@ -482,18 +762,20 @@ namespace llarp
|
||||||
{
|
{
|
||||||
if (excluded.count(r))
|
if (excluded.count(r))
|
||||||
continue;
|
continue;
|
||||||
router_id_fetch_sources.insert(r);
|
temp.insert(r);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// select at random until we have chosen enough
|
// select at random until we have chosen enough
|
||||||
while (router_id_fetch_sources.size() < ROUTER_ID_SOURCE_COUNT)
|
while (temp.size() < ROUTER_ID_SOURCE_COUNT)
|
||||||
{
|
{
|
||||||
RouterID r;
|
RouterID r;
|
||||||
std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng);
|
std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng);
|
||||||
if (excluded.count(r) == 0)
|
if (excluded.count(r) == 0)
|
||||||
router_id_fetch_sources.insert(r);
|
temp.insert(r);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rid_sources.swap(temp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|
|
@ -66,15 +66,19 @@ namespace llarp
|
||||||
std::unordered_set<RouterID> pinned_edges;
|
std::unordered_set<RouterID> pinned_edges;
|
||||||
|
|
||||||
// rc update info: we only set this upon a SUCCESSFUL fetching
|
// rc update info: we only set this upon a SUCCESSFUL fetching
|
||||||
RouterID rc_fetch_source;
|
RouterID fetch_source;
|
||||||
|
|
||||||
rc_time last_rc_update_relay_timestamp;
|
rc_time last_rc_update_relay_timestamp;
|
||||||
|
|
||||||
std::unordered_set<RouterID> router_id_fetch_sources;
|
std::unordered_set<RouterID> rid_sources;
|
||||||
|
|
||||||
|
std::unordered_set<RouterID> fail_sources;
|
||||||
|
|
||||||
// process responses once all are received (or failed/timed out)
|
// process responses once all are received (or failed/timed out)
|
||||||
std::unordered_map<RouterID, std::vector<RouterID>> router_id_fetch_responses;
|
std::unordered_map<RouterID, std::vector<RouterID>> fetch_rid_responses;
|
||||||
bool router_id_fetch_in_progress{false};
|
|
||||||
|
std::atomic<bool> is_fetching_rids{false}, is_fetching_rcs{false};
|
||||||
|
std::atomic<int> fetch_failures{0};
|
||||||
|
|
||||||
bool
|
bool
|
||||||
want_rc(const RouterID& rid) const;
|
want_rc(const RouterID& rid) const;
|
||||||
|
@ -135,8 +139,8 @@ namespace llarp
|
||||||
bool
|
bool
|
||||||
rotate_startup_rc_source();
|
rotate_startup_rc_source();
|
||||||
|
|
||||||
void
|
bool
|
||||||
store_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp);
|
process_fetched_rcs(RouterID source, std::vector<RemoteRC> rcs, rc_time timestamp);
|
||||||
|
|
||||||
void
|
void
|
||||||
ingest_rid_fetch_responses(const RemoteRC& source, std::vector<RouterID> ids = {});
|
ingest_rid_fetch_responses(const RemoteRC& source, std::vector<RouterID> ids = {});
|
||||||
|
@ -144,14 +148,32 @@ namespace llarp
|
||||||
bool
|
bool
|
||||||
process_fetched_rids();
|
process_fetched_rids();
|
||||||
|
|
||||||
|
void
|
||||||
|
fetch_initial();
|
||||||
|
|
||||||
bool
|
bool
|
||||||
fetch_initial_rcs(const RouterID& src);
|
fetch_initial_rcs(int n_fails = 0);
|
||||||
|
|
||||||
|
bool
|
||||||
|
fetch_initial_router_ids(int n_fails = 0);
|
||||||
|
|
||||||
void
|
void
|
||||||
fetch_rcs(int n_fails = 0, bool initial = false);
|
fetch_rcs();
|
||||||
|
|
||||||
void
|
void
|
||||||
fetch_router_ids(int n_fails = 0, bool initial = false);
|
fetch_rcs_result(bool error = false);
|
||||||
|
|
||||||
|
void
|
||||||
|
fetch_router_ids();
|
||||||
|
|
||||||
|
void
|
||||||
|
post_fetch_rcs();
|
||||||
|
|
||||||
|
void
|
||||||
|
post_fetch_rids();
|
||||||
|
|
||||||
|
void
|
||||||
|
fetch_rids_result(const RouterID& target, bool error = false);
|
||||||
|
|
||||||
void
|
void
|
||||||
select_router_id_sources(std::unordered_set<RouterID> excluded = {});
|
select_router_id_sources(std::unordered_set<RouterID> excluded = {});
|
||||||
|
|
|
@ -851,29 +851,24 @@ namespace llarp
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int num_failures = 0;
|
|
||||||
|
|
||||||
if (needs_initial_fetch)
|
if (needs_initial_fetch)
|
||||||
{
|
{
|
||||||
node_db()->fetch_rcs(num_failures, true);
|
node_db()->fetch_initial();
|
||||||
last_rc_fetch = now_timepoint;
|
|
||||||
node_db()->fetch_router_ids(num_failures, true);
|
|
||||||
last_routerid_fetch = now_timepoint;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// (client-only) periodically fetch updated RCs
|
// (client-only) periodically fetch updated RCs
|
||||||
if (now_timepoint - last_rc_fetch > RC_UPDATE_INTERVAL)
|
if (now_timepoint - last_rc_fetch > RC_UPDATE_INTERVAL)
|
||||||
{
|
{
|
||||||
node_db()->fetch_rcs(num_failures);
|
node_db()->fetch_rcs();
|
||||||
last_rc_fetch = now_timepoint;
|
last_rc_fetch = now_timepoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
// (client-only) periodically fetch updated RouterID list
|
// (client-only) periodically fetch updated RouterID list
|
||||||
if (now_timepoint - last_routerid_fetch > ROUTERID_UPDATE_INTERVAL)
|
if (now_timepoint - last_rid_fetch > ROUTERID_UPDATE_INTERVAL)
|
||||||
{
|
{
|
||||||
node_db()->fetch_router_ids(num_failures);
|
node_db()->fetch_router_ids();
|
||||||
last_routerid_fetch = now_timepoint;
|
last_rid_fetch = now_timepoint;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,8 @@ namespace llarp
|
||||||
|
|
||||||
struct Router : std::enable_shared_from_this<Router>
|
struct Router : std::enable_shared_from_this<Router>
|
||||||
{
|
{
|
||||||
|
friend class NodeDB;
|
||||||
|
|
||||||
explicit Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform);
|
explicit Router(EventLoop_ptr loop, std::shared_ptr<vpn::Platform> vpnPlatform);
|
||||||
|
|
||||||
~Router();
|
~Router();
|
||||||
|
@ -121,12 +123,6 @@ namespace llarp
|
||||||
|
|
||||||
bool needs_initial_fetch{true};
|
bool needs_initial_fetch{true};
|
||||||
|
|
||||||
std::chrono::system_clock::time_point last_rc_gossip{
|
|
||||||
std::chrono::system_clock::time_point::min()};
|
|
||||||
std::chrono::system_clock::time_point next_rc_gossip{last_rc_gossip};
|
|
||||||
std::chrono::system_clock::time_point last_rc_fetch{last_rc_gossip};
|
|
||||||
std::chrono::system_clock::time_point last_routerid_fetch{last_rc_gossip};
|
|
||||||
|
|
||||||
// should we be sending padded messages every interval?
|
// should we be sending padded messages every interval?
|
||||||
bool send_padding = false;
|
bool send_padding = false;
|
||||||
|
|
||||||
|
@ -149,6 +145,13 @@ namespace llarp
|
||||||
bool
|
bool
|
||||||
insufficient_peers() const;
|
insufficient_peers() const;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
std::chrono::system_clock::time_point last_rc_gossip{
|
||||||
|
std::chrono::system_clock::time_point::min()};
|
||||||
|
std::chrono::system_clock::time_point next_rc_gossip{last_rc_gossip};
|
||||||
|
std::chrono::system_clock::time_point last_rc_fetch{last_rc_gossip};
|
||||||
|
std::chrono::system_clock::time_point last_rid_fetch{last_rc_gossip};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void
|
void
|
||||||
for_each_connection(std::function<void(link::Connection&)> func);
|
for_each_connection(std::function<void(link::Connection&)> func);
|
||||||
|
|
Loading…
Reference in a new issue