diff --git a/llarp/link/link_manager.cpp b/llarp/link/link_manager.cpp index dda9a666f..ccbeb7539 100644 --- a/llarp/link/link_manager.cpp +++ b/llarp/link/link_manager.cpp @@ -457,6 +457,14 @@ namespace llarp auto since_time = rc_time{std::chrono::seconds{btdc.require("since")}}; + std::unordered_set explicit_relays; + + // Initial fetch: give me all the RC's + if (explicit_ids.empty()) + { + // TODO: this + } + if (explicit_ids.size() > (rcs.size() / 4)) { log::info( @@ -465,8 +473,6 @@ namespace llarp return; } - std::unordered_set explicit_relays; - for (auto& sv : explicit_ids) { if (sv.size() != RouterID::SIZE) @@ -478,10 +484,10 @@ namespace llarp explicit_relays.emplace(reinterpret_cast(sv.data())); } - oxenc::bt_dict_producer resp; + oxenc::bt_dict_producer btdp; { - auto rc_bt_list = resp.append_list("rcs"); + auto rc_sublist = btdp.append_list("rcs"); const auto& last_time = node_db->get_last_rc_update_times(); @@ -492,13 +498,13 @@ namespace llarp for (const auto& [_, rc] : rcs) { if (last_time.at(rc.router_id()) > since_time or explicit_relays.count(rc.router_id())) - rc_bt_list.append_encoded(rc.view()); + rc_sublist.append_encoded(rc.view()); } } - resp.append("time", now.time_since_epoch().count()); + btdp.append("time", now.time_since_epoch().count()); - m.respond(std::move(resp).str()); + m.respond(std::move(btdp).str()); } catch (const std::exception& e) { diff --git a/llarp/nodedb.cpp b/llarp/nodedb.cpp index 31834ed50..38b930b02 100644 --- a/llarp/nodedb.cpp +++ b/llarp/nodedb.cpp @@ -125,12 +125,12 @@ namespace llarp return false; } - RouterID temp = rc_fetch_source; + RouterID temp = fetch_source; - while (temp == rc_fetch_source) + while (temp == fetch_source) std::sample(active_client_routers.begin(), active_client_routers.end(), &temp, 1, csrng); - rc_fetch_source = std::move(temp); + fetch_source = std::move(temp); return true; } @@ -156,56 +156,55 @@ namespace llarp if (conn_count == 1) { // if we only have one connection, it must be current rc fetch source - assert(new_source.router_id() == rc_fetch_source); + assert(new_source.router_id() == fetch_source); if (pinned_edges.size() == 1) { // only one pinned edge set, use it even though it gave unsatisfactory RCs - assert(rc_fetch_source == *(pinned_edges.begin())); + assert(fetch_source == *(pinned_edges.begin())); log::warning( logcat, "Single pinned edge {} gave bad RC response; still using it despite this.", - rc_fetch_source); + fetch_source); return; } // only one connection, choose a new relay to connect to for rc fetching - RouterID r = rc_fetch_source; + RouterID r = fetch_source; - while (r == rc_fetch_source) + while (r == fetch_source) { std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng); } - rc_fetch_source = std::move(r); + fetch_source = std::move(r); return; } // choose one of our other existing connections to use as the RC fetch source - while (new_source.router_id() == rc_fetch_source) + while (new_source.router_id() == fetch_source) { _router.link_manager().get_random_connected(new_source); } - rc_fetch_source = new_source.router_id(); + + fetch_source = new_source.router_id(); } // TODO: trust model - void - NodeDB::store_fetched_rcs(RouterID source, std::vector rcs, rc_time timestamp) + bool + NodeDB::process_fetched_rcs(RouterID source, std::vector rcs, rc_time timestamp) { - (void)source; + fetch_source = source; - // TODO: if we don't currently have a "trusted" relay we've been fetching from, - // this will be a full list of RCs. We need to first check if it aligns closely - // with our trusted RouterID list, then replace our RCs with the incoming set. + /* + TODO: trust model analyzing returned list of RCs + */ for (auto& rc : rcs) put_rc_if_newer(std::move(rc), timestamp); - // TODO: if we have a "trusted" relay we've been fetching from, this will be - // an incremental update to the RC list, so *after* insertion we check if the - // RCs' RouterIDs closely match our trusted RouterID list. - last_rc_update_relay_timestamp = timestamp; + + return true; } void @@ -213,13 +212,14 @@ namespace llarp { const auto& rid = source.router_id(); - router_id_fetch_responses[rid] = std::move(ids); + fetch_rid_responses[rid] = std::move(ids); } + // TODO: trust model bool NodeDB::process_fetched_rids() { - for (const auto& [rid, responses] : router_id_fetch_responses) + for (const auto& [rid, responses] : fetch_rid_responses) { // TODO: empty == failure, handle that case for (const auto& response : responses) @@ -227,15 +227,36 @@ namespace llarp active_client_routers.insert(std::move(response)); } } - router_id_fetch_in_progress = false; return true; } void - NodeDB::fetch_rcs(int n_fails, bool initial) + NodeDB::fetch_initial() + { + int num_fails = 0; + + // fetch_initial_{rcs,router_ids} return false when num_fails == 0 + if (fetch_initial_rcs(num_fails)) + { + _router.last_rc_fetch = llarp::time_point_now(); + + if (fetch_initial_router_ids(num_fails)) + { + _router.last_rid_fetch = llarp::time_point_now(); + return; + } + } + + // failure case + // TODO: use bootstrap here! + } + + bool + NodeDB::fetch_initial_rcs(int n_fails) { int num_failures = n_fails; + is_fetching_rcs = true; std::vector needed; const auto now = time_point_now(); @@ -245,9 +266,8 @@ namespace llarp needed.push_back(rid); } - RouterID src = (initial) - ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) - : rc_fetch_source; + RouterID src = + *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()); while (num_failures < MAX_FETCH_ATTEMPTS) { @@ -285,7 +305,7 @@ namespace llarp rcs.emplace_back(btlc.consume_dict_consumer()); } - store_fetched_rcs(src, std::move(rcs), timestamp); + process_fetched_rcs(src, std::move(rcs), timestamp); p->set_value(true); } catch (const std::exception& e) @@ -299,9 +319,8 @@ namespace llarp if (f.get()) { log::debug(logcat, "Successfully fetched RC's from {}", src); - rc_fetch_source = src; - assert(_router.link_manager().have_connection_to(src)); - break; + fetch_source = src; + return true; } ++num_failures; @@ -312,34 +331,25 @@ namespace llarp num_failures, MAX_FETCH_ATTEMPTS); - src = (initial) - ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) - : std::next(known_rcs.begin(), csrng() % known_rcs.size())->first; + src = *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()); } + + return false; } - void - NodeDB::fetch_router_ids(int n_fails, bool initial) + bool + NodeDB::fetch_initial_router_ids(int n_fails) { - assert(not router_id_fetch_in_progress); + assert(not is_fetching_rids); + int num_failures = n_fails; + select_router_id_sources(); - if (router_id_fetch_sources.empty()) - select_router_id_sources(); + is_fetching_rids = true; + fetch_rid_responses.clear(); - // if we *still* don't have fetch sources, we can't exactly fetch... - if (router_id_fetch_sources.empty()) - { - log::info(logcat, "Attempting to fetch RouterIDs, but have no source from which to do so."); - return; - } - - router_id_fetch_in_progress = true; - router_id_fetch_responses.clear(); - - RouterID src = (initial) - ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) - : rc_fetch_source; + RouterID src = + *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()); std::unordered_set fails; @@ -350,7 +360,7 @@ namespace llarp auto f = success->get_future(); fails.clear(); - for (const auto& target : router_id_fetch_sources) + for (const auto& target : rid_sources) { _router.link_manager().fetch_router_ids( src, @@ -420,11 +430,9 @@ namespace llarp default: // RC node failed to relay our routerID request; re-select RC node and continue log::debug(logcat, "RC source {} failed to mediate RID fetching from {}", src, target); - src = (initial) - ? *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()) - : std::next(known_rcs.begin(), csrng() % known_rcs.size())->first; + src = *std::next(active_client_routers.begin(), csrng() % active_client_routers.size()); ++num_failures; - fetch_rcs(num_failures); + fetch_rcs(); continue; } } @@ -438,19 +446,18 @@ namespace llarp "RID fetching was successful ({}/{} acceptable errors)", fails.size(), MAX_RID_ERRORS); - rc_fetch_source = src; - assert(_router.link_manager().have_connection_to(src)); + fetch_source = src; // this is where the trust model will do verification based on the similarity of the sets if (process_fetched_rids()) { log::debug(logcat, "Accumulated RID's accepted by trust model"); - return; + return true; } log::debug( logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources..."); - select_router_id_sources(router_id_fetch_sources); + select_router_id_sources(rid_sources); ++num_failures; continue; } @@ -461,6 +468,271 @@ namespace llarp ++num_failures; select_router_id_sources(fails); } + + return false; + } + + void + NodeDB::fetch_rcs() + { + auto& num_failures = fetch_failures; + + // base case; this function is called recursively + if (num_failures > MAX_FETCH_ATTEMPTS) + { + fetch_rcs_result(true); + return; + } + + is_fetching_rcs = true; + + std::vector needed; + const auto now = time_point_now(); + + for (const auto& [rid, rc] : known_rcs) + { + if (now - rc.timestamp() > RouterContact::OUTDATED_AGE) + needed.push_back(rid); + } + + RouterID& src = fetch_source; + + _router.link_manager().fetch_rcs( + src, + RCFetchMessage::serialize(last_rc_update_relay_timestamp, needed), + [this, src](oxen::quic::message m) mutable { + if (m.timed_out) + { + log::info(logcat, "RC fetch to {} timed out", src); + fetch_rcs_result(true); + return; + } + try + { + oxenc::bt_dict_consumer btdc{m.body()}; + if (not m) + { + auto reason = btdc.require(messages::STATUS_KEY); + log::info(logcat, "RC fetch to {} returned error: {}", src, reason); + fetch_rcs_result(true); + return; + } + + auto btlc = btdc.require("rcs"sv); + auto timestamp = rc_time{std::chrono::seconds{btdc.require("time"sv)}}; + + std::vector rcs; + + while (not btlc.is_finished()) + { + rcs.emplace_back(btlc.consume_dict_consumer()); + } + + // if process_fetched_rcs returns false, then the trust model rejected the fetched RC's + fetch_rcs_result(not process_fetched_rcs(src, std::move(rcs), timestamp)); + } + catch (const std::exception& e) + { + log::info(logcat, "Failed to parse RC fetch response from {}: {}", src, e.what()); + fetch_rcs_result(true); + return; + } + }); + } + + void + NodeDB::fetch_rcs_result(bool error) + { + if (error) + { + ++fetch_failures; + + if (fetch_failures > MAX_FETCH_ATTEMPTS) + { + log::info( + logcat, + "Failed {} attempts to fetch RC's from {}; reverting to bootstrap...", + MAX_FETCH_ATTEMPTS, + fetch_source); + // TODO: revert to bootstrap + // set rc_fetch_source to bootstrap and try again! + } + else + // find new non-bootstrap RC fetch source and try again buddy + fetch_source = std::next(known_rcs.begin(), csrng() % known_rcs.size())->first; + + fetch_rcs(); + } + else + { + log::debug(logcat, "Successfully fetched RC's from {}", fetch_source); + post_fetch_rcs(); + } + } + + void + NodeDB::post_fetch_rcs() + { + is_fetching_rcs = false; + _router.last_rc_fetch = llarp::time_point_now(); + } + + // TODO: differentiate between errors from the relay node vs errors from the target nodes + void + NodeDB::fetch_router_ids() + { + auto& num_failures = fetch_failures; + + // base case; this function is called recursively + if (num_failures > MAX_FETCH_ATTEMPTS) + { + fetch_rids_result(fetch_source, true); + return; + } + + if (rid_sources.empty()) + select_router_id_sources(); + + // if we *still* don't have fetch sources, we can't exactly fetch... + if (rid_sources.empty()) + { + log::error(logcat, "Attempting to fetch RouterIDs, but have no source from which to do so."); + return; + } + + is_fetching_rids = true; + fetch_rid_responses.clear(); + + RouterID& src = fetch_source; + RemoteRC& src_rc = known_rcs[src]; + + for (const auto& target : rid_sources) + { + _router.link_manager().fetch_router_ids( + src, + RouterIDFetch::serialize(target), + [this, src, src_rc, target](oxen::quic::message m) mutable { + if (not m) + { + log::info(link_cat, "RID fetch from {} via {} timed out", src, target); + + ingest_rid_fetch_responses(src_rc); + fetch_rids_result(src, true); + return; + } + + try + { + oxenc::bt_dict_consumer btdc{m.body()}; + + btdc.required("routers"); + auto router_id_strings = btdc.consume_list>(); + + btdc.require_signature("signature", [&src](ustring_view msg, ustring_view sig) { + if (sig.size() != 64) + throw std::runtime_error{"Invalid signature: not 64 bytes"}; + if (not crypto::verify(src, msg, sig)) + throw std::runtime_error{ + "Failed to verify signature for fetch RouterIDs response."}; + }); + + std::vector router_ids; + + for (const auto& s : router_id_strings) + { + if (s.size() != RouterID::SIZE) + { + log::warning( + link_cat, "RID fetch from {} via {} returned bad RouterID", target, src); + ingest_rid_fetch_responses(src_rc); + fetch_rids_result(src, true); + return; + } + + router_ids.emplace_back(s.data()); + } + + ingest_rid_fetch_responses(src_rc, std::move(router_ids)); + fetch_rids_result(src); + return; + } + catch (const std::exception& e) + { + log::info(link_cat, "Error handling fetch RouterIDs response: {}", e.what()); + ingest_rid_fetch_responses(src_rc); + fetch_rids_result(src, true); + } + }); + } + } + + void + NodeDB::fetch_rids_result(const RouterID& target, bool error) + { + if (error) + { + fail_sources.insert(target); + ++fetch_failures; + + if (fetch_failures > MAX_FETCH_ATTEMPTS) + { + log::info( + logcat, + "Failed {} attempts to fetch RID's from {}; reverting to bootstrap...", + MAX_FETCH_ATTEMPTS, + fetch_source); + // TODO: revert to bootstrap + // set rc_fetch_source to bootstrap and START OVER! + } + else + // find new non-bootstrap RC fetch source and try again buddy + fetch_source = std::next(known_rcs.begin(), csrng() % known_rcs.size())->first; + + fetch_router_ids(); + return; + } + + log::debug(logcat, "Successfully fetched RID's from {}", fetch_source); + auto n_fails = fail_sources.size(); + + if (n_fails <= MAX_RID_ERRORS) + { + log::debug( + logcat, "RID fetching was successful ({}/{} acceptable errors)", n_fails, MAX_RID_ERRORS); + + // this is where the trust model will do verification based on the similarity of the sets + if (process_fetched_rids()) + { + log::debug(logcat, "Accumulated RID's accepted by trust model"); + post_fetch_rids(); + return; + } + + log::debug( + logcat, "Accumulated RID's rejected by trust model, reselecting all RID sources..."); + select_router_id_sources(rid_sources); + ++fetch_failures; + } + else + { + // we had 4 or more failed requests, so we will need to rotate our rid sources + log::debug( + logcat, "RID fetching found {} failures; reselecting failed RID sources...", n_fails); + ++fetch_failures; + select_router_id_sources(fail_sources); + } + + fetch_router_ids(); + } + + void + NodeDB::post_fetch_rids() + { + is_fetching_rids = false; + fetch_rid_responses.clear(); + fail_sources.clear(); + fetch_failures = 0; + _router.last_rid_fetch = llarp::time_point_now(); } void @@ -471,9 +743,17 @@ namespace llarp if (active_client_routers.empty()) return; + // in case we pass the entire list + std::unordered_set temp = rid_sources; + // keep using any we've been using, but remove `excluded` ones - for (const auto& r : excluded) - router_id_fetch_sources.erase(r); + if (excluded == rid_sources) + temp.clear(); + else + { + for (const auto& r : excluded) + temp.erase(r); + } // only know so many routers, so no need to randomize if (active_client_routers.size() <= (ROUTER_ID_SOURCE_COUNT + excluded.size())) @@ -482,18 +762,20 @@ namespace llarp { if (excluded.count(r)) continue; - router_id_fetch_sources.insert(r); + temp.insert(r); } } // select at random until we have chosen enough - while (router_id_fetch_sources.size() < ROUTER_ID_SOURCE_COUNT) + while (temp.size() < ROUTER_ID_SOURCE_COUNT) { RouterID r; std::sample(active_client_routers.begin(), active_client_routers.end(), &r, 1, csrng); if (excluded.count(r) == 0) - router_id_fetch_sources.insert(r); + temp.insert(r); } + + rid_sources.swap(temp); } void diff --git a/llarp/nodedb.hpp b/llarp/nodedb.hpp index 8b6b540dd..df5fb1ee8 100644 --- a/llarp/nodedb.hpp +++ b/llarp/nodedb.hpp @@ -66,15 +66,19 @@ namespace llarp std::unordered_set pinned_edges; // rc update info: we only set this upon a SUCCESSFUL fetching - RouterID rc_fetch_source; + RouterID fetch_source; rc_time last_rc_update_relay_timestamp; - std::unordered_set router_id_fetch_sources; + std::unordered_set rid_sources; + + std::unordered_set fail_sources; // process responses once all are received (or failed/timed out) - std::unordered_map> router_id_fetch_responses; - bool router_id_fetch_in_progress{false}; + std::unordered_map> fetch_rid_responses; + + std::atomic is_fetching_rids{false}, is_fetching_rcs{false}; + std::atomic fetch_failures{0}; bool want_rc(const RouterID& rid) const; @@ -135,8 +139,8 @@ namespace llarp bool rotate_startup_rc_source(); - void - store_fetched_rcs(RouterID source, std::vector rcs, rc_time timestamp); + bool + process_fetched_rcs(RouterID source, std::vector rcs, rc_time timestamp); void ingest_rid_fetch_responses(const RemoteRC& source, std::vector ids = {}); @@ -144,14 +148,32 @@ namespace llarp bool process_fetched_rids(); + void + fetch_initial(); + bool - fetch_initial_rcs(const RouterID& src); + fetch_initial_rcs(int n_fails = 0); + + bool + fetch_initial_router_ids(int n_fails = 0); void - fetch_rcs(int n_fails = 0, bool initial = false); + fetch_rcs(); void - fetch_router_ids(int n_fails = 0, bool initial = false); + fetch_rcs_result(bool error = false); + + void + fetch_router_ids(); + + void + post_fetch_rcs(); + + void + post_fetch_rids(); + + void + fetch_rids_result(const RouterID& target, bool error = false); void select_router_id_sources(std::unordered_set excluded = {}); diff --git a/llarp/router/router.cpp b/llarp/router/router.cpp index 6a50d2f7d..e4f0cafaf 100644 --- a/llarp/router/router.cpp +++ b/llarp/router/router.cpp @@ -851,29 +851,24 @@ namespace llarp } else { - int num_failures = 0; - if (needs_initial_fetch) { - node_db()->fetch_rcs(num_failures, true); - last_rc_fetch = now_timepoint; - node_db()->fetch_router_ids(num_failures, true); - last_routerid_fetch = now_timepoint; + node_db()->fetch_initial(); } else { // (client-only) periodically fetch updated RCs if (now_timepoint - last_rc_fetch > RC_UPDATE_INTERVAL) { - node_db()->fetch_rcs(num_failures); + node_db()->fetch_rcs(); last_rc_fetch = now_timepoint; } // (client-only) periodically fetch updated RouterID list - if (now_timepoint - last_routerid_fetch > ROUTERID_UPDATE_INTERVAL) + if (now_timepoint - last_rid_fetch > ROUTERID_UPDATE_INTERVAL) { - node_db()->fetch_router_ids(num_failures); - last_routerid_fetch = now_timepoint; + node_db()->fetch_router_ids(); + last_rid_fetch = now_timepoint; } } } diff --git a/llarp/router/router.hpp b/llarp/router/router.hpp index 90a8a24b4..44e9f40a9 100644 --- a/llarp/router/router.hpp +++ b/llarp/router/router.hpp @@ -55,6 +55,8 @@ namespace llarp struct Router : std::enable_shared_from_this { + friend class NodeDB; + explicit Router(EventLoop_ptr loop, std::shared_ptr vpnPlatform); ~Router(); @@ -121,12 +123,6 @@ namespace llarp bool needs_initial_fetch{true}; - std::chrono::system_clock::time_point last_rc_gossip{ - std::chrono::system_clock::time_point::min()}; - std::chrono::system_clock::time_point next_rc_gossip{last_rc_gossip}; - std::chrono::system_clock::time_point last_rc_fetch{last_rc_gossip}; - std::chrono::system_clock::time_point last_routerid_fetch{last_rc_gossip}; - // should we be sending padded messages every interval? bool send_padding = false; @@ -149,6 +145,13 @@ namespace llarp bool insufficient_peers() const; + protected: + std::chrono::system_clock::time_point last_rc_gossip{ + std::chrono::system_clock::time_point::min()}; + std::chrono::system_clock::time_point next_rc_gossip{last_rc_gossip}; + std::chrono::system_clock::time_point last_rc_fetch{last_rc_gossip}; + std::chrono::system_clock::time_point last_rid_fetch{last_rc_gossip}; + public: void for_each_connection(std::function func);