1
1
Fork 0
mirror of https://github.com/oxen-io/lokinet synced 2023-12-14 06:53:00 +01:00

Merge pull request #1122 from jagerman/deabseil-mutexes

De-abseil, part 2: mutex, locks, (most) time
This commit is contained in:
Jeff 2020-02-24 08:34:43 -05:00 committed by GitHub
commit 60f92f2f45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
122 changed files with 492 additions and 8602 deletions

View file

@ -3,7 +3,6 @@
#include <util/logging/logger.hpp>
#include <util/logging/ostream_logger.hpp>
#include <absl/synchronization/mutex.h>
#include <cxxopts.hpp>
#include <string>
#include <vector>
@ -110,10 +109,6 @@ namespace
int
main(int argc, char* argv[])
{
#ifdef LOKINET_DEBUG
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
// clang-format off
cxxopts::Options options(
"lokinetctl",

View file

@ -102,10 +102,6 @@ main(int argc, char *argv[])
// SetUnhandledExceptionFilter(win32_signal_handler);
#endif
#ifdef LOKINET_DEBUG
absl::SetMutexDeadlockDetectionMode(absl::OnDeadlockCycle::kAbort);
#endif
// clang-format off
cxxopts::Options options(
"lokinet",

View file

@ -23,38 +23,19 @@ namespace llarp
struct Config;
struct Crypto;
struct CryptoManager;
struct MetricsConfig;
struct RouterContact;
namespace thread
{
class ThreadPool;
}
namespace metrics
{
class DefaultManagerGuard;
class PublisherScheduler;
} // namespace metrics
namespace thread
{
class Scheduler;
}
struct Context
{
/// get context from main pointer
static Context *
Get(llarp_main *);
Context();
~Context();
// These come first, in this order.
// This ensures we get metric collection on shutdown
std::unique_ptr< thread::Scheduler > m_scheduler;
std::unique_ptr< metrics::DefaultManagerGuard > m_metricsManager;
std::unique_ptr< metrics::PublisherScheduler > m_metricsPublisher;
Context() = default;
std::unique_ptr< Crypto > crypto;
std::unique_ptr< CryptoManager > cryptoManager;
@ -123,9 +104,6 @@ namespace llarp
bool
ReloadConfig();
void
setupMetrics(const MetricsConfig &metricsConfig);
std::string configfile;
std::string pidfile;
std::unique_ptr< std::promise< void > > closeWaiter;

View file

@ -4,21 +4,11 @@ set(LIB_UTIL_SRC
config/config.cpp
config/ini.cpp
config/key_manager.cpp
constants/defaults.cpp
constants/limits.cpp
constants/link_layer.cpp
constants/path.cpp
constants/proto.cpp
${CMAKE_CURRENT_BINARY_DIR}/constants/version.cpp
util/aligned.cpp
util/bencode.cpp
util/bits.cpp
util/buffer.cpp
util/codel.cpp
util/common.cpp
util/encode.cpp
util/endian.cpp
util/decaying_hashset.cpp
util/fs.cpp
util/json.cpp
util/logging/android_logger.cpp
@ -31,32 +21,14 @@ set(LIB_UTIL_SRC
util/logging/win32_logger.cpp
util/lokinet_init.c
util/mem.cpp
util/meta/memfn_traits.cpp
util/meta/memfn.cpp
util/meta/object.cpp
util/meta/traits.cpp
util/meta/variant.cpp
util/metrics/core.cpp
util/metrics/json_publisher.cpp
util/metrics/metrics.cpp
util/metrics/metrictank_publisher.cpp
util/metrics/stream_publisher.cpp
util/metrics/types.cpp
util/printer.cpp
util/status.cpp
util/stopwatch.cpp
util/str.cpp
util/string_view.cpp
util/thread/logic.cpp
util/thread/queue_manager.cpp
util/thread/queue.cpp
util/thread/scheduler.cpp
util/thread/thread_pool.cpp
util/thread/threading.cpp
util/thread/threadpool.cpp
util/thread/timerqueue.cpp
util/time.cpp
util/types.cpp
)
add_library(${UTIL_LIB} STATIC ${LIB_UTIL_SRC})
@ -70,7 +42,7 @@ endif()
target_link_libraries(${UTIL_LIB} PUBLIC ${CRYPTOGRAPHY_LIB} ${LOG_LIB} ${CURL_LIBRARIES})
target_link_libraries(${UTIL_LIB} PUBLIC
absl::synchronization absl::flat_hash_map absl::container
absl::time absl::hash
nlohmann_json::nlohmann_json
ghc_filesystem
optional-lite
@ -78,7 +50,7 @@ target_link_libraries(${UTIL_LIB} PUBLIC
# cut back on fluff
if (NOT WIN32)
target_link_libraries(${UTIL_LIB} PUBLIC absl::variant absl::strings)
target_link_libraries(${UTIL_LIB} PUBLIC absl::strings)
endif(NOT WIN32)
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
@ -126,15 +98,12 @@ if(WIN32)
target_link_libraries(${PLATFORM_LIB} PUBLIC iphlpapi)
endif()
set(DNSLIB_SRC
dns/dns.cpp
dns/message.cpp
dns/name.cpp
dns/query.cpp
dns/question.cpp
dns/rr.cpp
dns/serialize.cpp
dns/server.cpp
dns/string.cpp
)
set(CONSENSUS_SRC
@ -146,19 +115,13 @@ set(LIB_SRC
${DNSLIB_SRC}
bootstrap.cpp
context.cpp
crypto/constants.cpp
crypto/crypto_libsodium.cpp
crypto/crypto_noop.cpp
crypto/crypto.cpp
crypto/encrypted_frame.cpp
crypto/encrypted.cpp
crypto/types.cpp
dht/bucket.cpp
dht/context.cpp
dht/dht.cpp
dht/explorenetworkjob.cpp
dht/kademlia.cpp
dht/key.cpp
dht/localtaglookup.cpp
dht/localrouterlookup.cpp
dht/localserviceaddresslookup.cpp
@ -168,21 +131,16 @@ set(LIB_SRC
dht/messages/gotintro.cpp
dht/messages/gotrouter.cpp
dht/messages/pubintro.cpp
dht/node.cpp
dht/publishservicejob.cpp
dht/recursiverouterlookup.cpp
dht/serviceaddresslookup.cpp
dht/taglookup.cpp
dht/tx.cpp
dht/txholder.cpp
dht/txowner.cpp
exit/context.cpp
exit/endpoint.cpp
exit/exit_messages.cpp
exit/policy.cpp
exit/session.cpp
handlers/exit.cpp
handlers/null.cpp
handlers/tun.cpp
hook/shell.cpp
iwp/iwp.cpp
@ -190,15 +148,11 @@ set(LIB_SRC
iwp/message_buffer.cpp
iwp/session.cpp
link/factory.cpp
link/i_link_manager.cpp
link/link_manager.cpp
link/server.cpp
link/session.cpp
messages/dht_immediate.cpp
messages/discard.cpp
messages/link_intro.cpp
messages/link_message_parser.cpp
messages/link_message.cpp
messages/relay.cpp
messages/relay_commit.cpp
messages/relay_status.cpp
@ -207,30 +161,22 @@ set(LIB_SRC
nodedb.cpp
path/ihophandler.cpp
path/path_context.cpp
path/path_types.cpp
path/path.cpp
path/pathbuilder.cpp
path/pathset.cpp
path/transit_hop.cpp
pow.cpp
profiling.cpp
router/abstractrouter.cpp
router/i_outbound_message_handler.cpp
router/outbound_message_handler.cpp
router/i_outbound_session_maker.cpp
router/outbound_session_maker.cpp
router/i_rc_lookup_handler.cpp
router/rc_lookup_handler.cpp
router/i_gossiper.cpp
router/rc_gossiper.cpp
router/router.cpp
router_contact.cpp
router_id.cpp
router_version.cpp
routing/dht_message.cpp
routing/handler.cpp
routing/message_parser.cpp
routing/message.cpp
routing/path_confirm_message.cpp
routing/path_latency_message.cpp
routing/path_transfer_message.cpp
@ -243,7 +189,6 @@ set(LIB_SRC
service/endpoint_state.cpp
service/endpoint_util.cpp
service/endpoint.cpp
service/handler.cpp
service/hidden_service_address_lookup.cpp
service/identity.cpp
service/info.cpp
@ -251,14 +196,12 @@ set(LIB_SRC
service/intro.cpp
service/lookup.cpp
service/outbound_context.cpp
service/pendingbuffer.cpp
service/protocol.cpp
service/router_lookup_job.cpp
service/sendcontext.cpp
service/session.cpp
service/tag_lookup_job.cpp
service/tag.cpp
service/vanity.cpp
)
if(TESTNET)
set(LIB_SRC ${LIB_SRC} testnet.c)

View file

@ -9,7 +9,6 @@
#include <util/logging/logger_syslog.hpp>
#include <util/logging/logger.hpp>
#include <util/mem.hpp>
#include <util/meta/memfn.hpp>
#include <util/str.hpp>
#include <util/lokinet_init.h>

View file

@ -1 +0,0 @@
#include <constants/defaults.hpp>

View file

@ -1 +0,0 @@
#include <constants/link_layer.hpp>

View file

@ -1 +0,0 @@
#include <constants/path.hpp>

View file

@ -1 +0,0 @@
#include <constants/proto.hpp>

View file

@ -11,12 +11,6 @@
#include <router/router.hpp>
#include <service/context.hpp>
#include <util/logging/logger.h>
#include <util/meta/memfn.hpp>
#include <util/metrics/json_publisher.hpp>
#include <util/metrics/metrics.hpp>
#include <util/metrics/metrictank_publisher.hpp>
#include <util/metrics/stream_publisher.hpp>
#include <util/thread/scheduler.hpp>
#include <absl/strings/str_split.h>
#include <cxxopts.hpp>
@ -28,14 +22,6 @@
namespace llarp
{
Context::Context() = default;
Context::~Context()
{
if(m_scheduler)
m_scheduler->stop();
}
bool
Context::CallSafe(std::function< void(void) > f)
{
@ -73,90 +59,9 @@ namespace llarp
nodedb_dir = config->netdb.nodedbDir();
if(!config->metrics.disableMetrics)
{
auto &metricsConfig = config->metrics;
auto &tags = metricsConfig.metricTags;
tags["netid"] = config->router.netId();
tags["nickname"] = config->router.nickname();
setupMetrics(metricsConfig);
if(!config->metrics.disableMetricLogs)
{
m_metricsManager->instance()->addGlobalPublisher(
std::make_shared< metrics::StreamPublisher >(std::cerr));
}
}
return true;
}
void
Context::setupMetrics(const MetricsConfig &metricsConfig)
{
if(!m_scheduler)
{
m_scheduler = std::make_unique< thread::Scheduler >();
}
if(!m_metricsManager)
{
m_metricsManager = std::make_unique< metrics::DefaultManagerGuard >();
}
if(!m_metricsPublisher)
{
m_metricsPublisher = std::make_unique< metrics::PublisherScheduler >(
*m_scheduler, m_metricsManager->instance());
}
if(!metricsConfig.jsonMetricsPath.native().empty())
{
m_metricsManager->instance()->addGlobalPublisher(
std::make_shared< metrics::JsonPublisher >(
std::bind(&metrics::JsonPublisher::directoryPublisher,
std::placeholders::_1, metricsConfig.jsonMetricsPath)));
}
if(!metricsConfig.metricTankHost.empty())
{
if(std::getenv("LOKINET_ENABLE_METRIC_TANK"))
{
static std::string WARNING = R"(
__ ___ ____ _ _ ___ _ _ ____
\ \ / / \ | _ \| \ | |_ _| \ | |/ ___|
\ \ /\ / / _ \ | |_) | \| || || \| | | _
\ V V / ___ \| _ <| |\ || || |\ | |_| |
\_/\_/_/ \_\_| \_\_| \_|___|_| \_|\____|
This Lokinet session is not private!!
Sending connection metrics to metrictank!!
__ ___ ____ _ _ ___ _ _ ____
\ \ / / \ | _ \| \ | |_ _| \ | |/ ___|
\ \ /\ / / _ \ | |_) | \| || || \| | | _
\ V V / ___ \| _ <| |\ || || |\ | |_| |
\_/\_/_/ \_\_| \_\_| \_|___|_| \_|\____|
)";
std::cerr << WARNING << '\n';
std::pair< std::string, std::string > split =
absl::StrSplit(metricsConfig.metricTankHost, ':');
m_metricsManager->instance()->addGlobalPublisher(
std::make_shared< metrics::MetricTankPublisher >(
metricsConfig.metricTags, split.first, stoi(split.second)));
}
else
{
std::cerr << "metrictank host specified, but "
"LOKINET_ENABLE_METRIC_TANK not set, skipping\n";
}
}
m_metricsPublisher->setDefault(absl::Seconds(30));
m_scheduler->start();
}
void
Context::SetPIDFile(const std::string &fname)
{

View file

@ -1 +0,0 @@
#include <crypto/constants.hpp>

View file

@ -1 +0,0 @@
#include <crypto/crypto_noop.hpp>

View file

@ -1 +0,0 @@
#include <crypto/encrypted.hpp>

View file

@ -1 +0,0 @@
#include <dht/bucket.hpp>

View file

@ -1 +0,0 @@
#include <dht/kademlia.hpp>

View file

@ -1 +0,0 @@
#include <dht/key.hpp>

View file

@ -1 +0,0 @@
#include <dht/node.hpp>

View file

@ -1 +0,0 @@
#include <dht/tx.hpp>

View file

@ -1 +0,0 @@
#include <dht/txholder.hpp>

View file

@ -1 +0,0 @@
#include <dht/txowner.hpp>

View file

@ -1 +0,0 @@
#include <dns/dns.hpp>

View file

@ -1 +0,0 @@
#include <dns/query.hpp>

View file

@ -1 +0,0 @@
#include <dns/string.hpp>

View file

@ -1 +0,0 @@
#include <handlers/null.hpp>

View file

@ -2,7 +2,6 @@
#include <iwp/linklayer.hpp>
#include <memory>
#include <router/abstractrouter.hpp>
#include <util/meta/memfn.hpp>
namespace llarp
{

View file

@ -48,7 +48,7 @@ namespace llarp
bool isNewSession = false;
if(itr == m_AuthedAddrs.end())
{
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
Lock_t lock(m_PendingMutex);
if(m_Pending.count(from) == 0)
{
if(not permitInbound)
@ -60,7 +60,7 @@ namespace llarp
}
else
{
ACQUIRE_LOCK(Lock_t lock, m_AuthedLinksMutex);
Lock_t lock(m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(itr->second);
session = range.first->second;
}

View file

@ -1 +0,0 @@
#include <link/i_link_manager.hpp>

View file

@ -76,7 +76,7 @@ namespace llarp
void
LinkManager::AddLink(LinkLayer_ptr link, bool inbound)
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
if(inbound)
{
@ -128,7 +128,7 @@ namespace llarp
return;
}
util::Lock l(&_mutex);
util::Lock l(_mutex);
LogInfo("stopping links");
stopping = true;
@ -145,7 +145,7 @@ namespace llarp
if(stopping)
return;
util::Lock l(&_mutex);
util::Lock l(_mutex);
m_PersistingSessions[remote] =
std::max(until, m_PersistingSessions[remote]);
@ -297,7 +297,7 @@ namespace llarp
std::vector< RouterID > sessionsNeeded;
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
auto itr = m_PersistingSessions.begin();
while(itr != m_PersistingSessions.end())

View file

@ -38,7 +38,7 @@ namespace llarp
bool
ILinkLayer::HasSessionTo(const RouterID& id)
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
return m_AuthedLinks.find(id) != m_AuthedLinks.end();
}
@ -48,7 +48,7 @@ namespace llarp
{
std::vector< std::shared_ptr< ILinkSession > > sessions;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
if(m_AuthedLinks.size() == 0)
return;
const size_t sz = randint() % m_AuthedLinks.size();
@ -84,7 +84,7 @@ namespace llarp
{
std::shared_ptr< ILinkSession > session;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.find(pk);
if(itr == m_AuthedLinks.end())
return false;
@ -98,7 +98,7 @@ namespace llarp
{
std::vector< std::shared_ptr< ILinkSession > > sessions;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -136,7 +136,7 @@ namespace llarp
std::vector< std::shared_ptr< ILinkSession > > closedPending;
auto _now = Now();
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -156,7 +156,7 @@ namespace llarp
}
}
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
Lock_t l(m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
@ -176,7 +176,7 @@ namespace llarp
}
}
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
for(const auto& r : closedSessions)
{
if(m_AuthedLinks.count(r) == 0)
@ -196,8 +196,8 @@ namespace llarp
bool
ILinkLayer::MapAddr(const RouterID& pk, ILinkSession* s)
{
ACQUIRE_LOCK(Lock_t l_authed, m_AuthedLinksMutex);
ACQUIRE_LOCK(Lock_t l_pending, m_PendingMutex);
Lock_t l_authed(m_AuthedLinksMutex);
Lock_t l_pending(m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
auto itr = m_Pending.find(addr);
if(itr != m_Pending.end())
@ -237,7 +237,7 @@ namespace llarp
std::vector< util::StatusObject > pending, established;
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
Lock_t l(m_PendingMutex);
std::transform(m_Pending.cbegin(), m_Pending.cend(),
std::back_inserter(pending),
[](const auto& item) -> util::StatusObject {
@ -245,7 +245,7 @@ namespace llarp
});
}
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
std::transform(m_AuthedLinks.cbegin(), m_AuthedLinks.cend(),
std::back_inserter(established),
[](const auto& item) -> util::StatusObject {
@ -265,7 +265,7 @@ namespace llarp
ILinkLayer::TryEstablishTo(RouterContact rc)
{
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
if(m_AuthedLinks.count(rc.pubkey) >= MaxSessionsPerKey)
return false;
}
@ -274,7 +274,7 @@ namespace llarp
return false;
const llarp::Addr addr(to);
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
Lock_t l(m_PendingMutex);
if(m_Pending.count(addr) >= MaxSessionsPerKey)
return false;
}
@ -301,7 +301,7 @@ namespace llarp
ILinkLayer::Tick(llarp_time_t now)
{
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -311,7 +311,7 @@ namespace llarp
}
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
Lock_t l(m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
@ -338,7 +338,7 @@ namespace llarp
if(m_Logic && tick_id)
m_Logic->remove_call(tick_id);
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
@ -347,7 +347,7 @@ namespace llarp
}
}
{
ACQUIRE_LOCK(Lock_t l, m_PendingMutex);
Lock_t l(m_PendingMutex);
auto itr = m_Pending.begin();
while(itr != m_Pending.end())
{
@ -362,7 +362,7 @@ namespace llarp
{
static constexpr llarp_time_t CloseGraceWindow = 500;
const auto now = Now();
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
RouterID r = remote;
llarp::LogInfo("Closing all to ", r);
auto range = m_AuthedLinks.equal_range(r);
@ -379,7 +379,7 @@ namespace llarp
void
ILinkLayer::KeepAliveSessionTo(const RouterID& remote)
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
while(itr != range.second)
@ -396,7 +396,7 @@ namespace llarp
{
std::shared_ptr< ILinkSession > s;
{
ACQUIRE_LOCK(Lock_t l, m_AuthedLinksMutex);
Lock_t l(m_AuthedLinksMutex);
auto range = m_AuthedLinks.equal_range(remote);
auto itr = range.first;
// pick lowest backlog session
@ -445,7 +445,7 @@ namespace llarp
ILinkLayer::PutSession(const std::shared_ptr< ILinkSession >& s)
{
static constexpr size_t MaxSessionsPerEndpoint = 5;
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
Lock_t lock(m_PendingMutex);
llarp::Addr addr = s->GetRemoteEndpoint();
if(m_Pending.count(addr) >= MaxSessionsPerEndpoint)
return false;

View file

@ -74,12 +74,11 @@ namespace llarp
void
ForEachSession(std::function< void(const ILinkSession*) > visit,
bool randomize = false) const
LOCKS_EXCLUDED(m_AuthedLinksMutex);
bool randomize = false) const EXCLUDES(m_AuthedLinksMutex);
void
ForEachSession(std::function< void(ILinkSession*) > visit)
LOCKS_EXCLUDED(m_AuthedLinksMutex);
EXCLUDES(m_AuthedLinksMutex);
static void
udp_tick(llarp_udp_io* udp);
@ -120,7 +119,7 @@ namespace llarp
Name() const = 0;
util::StatusObject
ExtractStatus() const LOCKS_EXCLUDED(m_AuthedLinksMutex);
ExtractStatus() const EXCLUDES(m_AuthedLinksMutex);
void
CloseSessionTo(const RouterID& remote);
@ -138,7 +137,7 @@ namespace llarp
bool
VisitSessionByPubkey(const RouterID& pk,
std::function< bool(ILinkSession*) > visit)
LOCKS_EXCLUDED(m_AuthedLinksMutex);
EXCLUDES(m_AuthedLinksMutex);
virtual uint16_t
Rank() const = 0;
@ -196,13 +195,13 @@ namespace llarp
/// called by link session to remove a pending session who is timed out
// void
// RemovePending(ILinkSession* s) LOCKS_EXCLUDED(m_PendingMutex);
// RemovePending(ILinkSession* s) EXCLUDES(m_PendingMutex);
/// count the number of sessions that are yet to be fully connected
size_t
NumberOfPendingSessions() const
{
ACQUIRE_LOCK(Lock_t lock, m_PendingMutex);
Lock_t lock(m_PendingMutex);
return m_Pending.size();
}

View file

@ -1 +0,0 @@
#include <link/session.hpp>

View file

@ -1 +0,0 @@
#include <messages/discard.hpp>

View file

@ -1 +0,0 @@
#include <messages/link_message.hpp>

View file

@ -10,7 +10,6 @@
#include <router_contact.hpp>
#include <util/buffer.hpp>
#include <util/logging/logger.hpp>
#include <util/metrics/metrics.hpp>
#include <memory>
@ -69,12 +68,10 @@ namespace llarp
}
// create the message to parse based off message type
llarp::LogDebug("inbound message ", *strbuf.cur);
bool isLIM = false;
switch(*strbuf.cur)
{
case 'i':
msg = &holder->i;
isLIM = true;
msg = &holder->i;
break;
case 'd':
msg = &holder->d;
@ -98,12 +95,6 @@ namespace llarp
return false;
}
if(!isLIM)
{
metrics::integerTick(msg->Name(), "RX", 1, "id",
RouterID(from->GetPubKey()).ToString());
}
msg->session = from;
firstkey = false;
return true;

View file

@ -43,14 +43,14 @@ llarp_nodedb::Remove(const llarp::RouterID &pk)
void
llarp_nodedb::Clear()
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
entries.clear();
}
bool
llarp_nodedb::Get(const llarp::RouterID &pk, llarp::RouterContact &result)
{
llarp::util::Lock l(&access);
llarp::util::Lock l(access);
auto itr = entries.find(pk);
if(itr == entries.end())
return false;
@ -71,7 +71,7 @@ llarp_nodedb::RemoveIf(
{
std::set< std::string > files;
{
llarp::util::Lock l(&access);
llarp::util::Lock l(access);
auto itr = entries.begin();
while(itr != entries.end())
{
@ -91,7 +91,7 @@ llarp_nodedb::RemoveIf(
bool
llarp_nodedb::Has(const llarp::RouterID &pk)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
return entries.find(pk) != entries.end();
}
@ -118,7 +118,7 @@ std::vector< llarp::RouterContact >
llarp_nodedb::FindClosestTo(const llarp::dht::Key_t &location,
uint32_t numRouters)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
std::vector< const llarp::RouterContact * > all;
all.reserve(entries.size());
@ -180,7 +180,7 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc,
std::shared_ptr< llarp::Logic > logic,
std::function< void(void) > completionHandler)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
auto itr = entries.find(rc.pubkey);
if(itr == entries.end() || itr->second.rc.OtherIsNewer(rc))
{
@ -201,7 +201,7 @@ llarp_nodedb::UpdateAsyncIfNewer(llarp::RouterContact rc,
bool
llarp_nodedb::Insert(const llarp::RouterContact &rc)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
auto itr = entries.find(rc.pubkey.as_array());
if(itr != entries.end())
entries.erase(itr);
@ -241,7 +241,7 @@ void
llarp_nodedb::SaveAll()
{
std::array< byte_t, MAX_RC_SIZE > tmp;
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
for(const auto &item : entries)
{
llarp_buffer_t buf(tmp);
@ -307,7 +307,7 @@ llarp_nodedb::loadfile(const fs::path &fpath)
return false;
}
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
entries.emplace(rc.pubkey.as_array(), rc);
}
return true;
@ -316,7 +316,7 @@ llarp_nodedb::loadfile(const fs::path &fpath)
void
llarp_nodedb::visit(std::function< bool(const llarp::RouterContact &) > visit)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
auto itr = entries.begin();
while(itr != entries.end())
{
@ -331,7 +331,7 @@ llarp_nodedb::VisitInsertedBefore(
std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
auto itr = entries.begin();
while(itr != entries.end())
{
@ -487,14 +487,14 @@ llarp_nodedb::LoadAll()
size_t
llarp_nodedb::num_loaded() const
{
absl::ReaderMutexLock l(&access);
auto l = llarp::util::shared_lock(access);
return entries.size();
}
bool
llarp_nodedb::select_random_exit(llarp::RouterContact &result)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
const auto sz = entries.size();
auto itr = entries.begin();
if(sz < 3)
@ -529,7 +529,7 @@ bool
llarp_nodedb::select_random_hop(const llarp::RouterContact &prev,
llarp::RouterContact &result, size_t N)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
/// checking for "guard" status for N = 0 is done by caller inside of
/// pathbuilder's scope
size_t sz = entries.size();
@ -575,7 +575,7 @@ bool
llarp_nodedb::select_random_hop_excluding(
llarp::RouterContact &result, const std::set< llarp::RouterID > &exclude)
{
llarp::util::Lock lock(&access);
llarp::util::Lock lock(access);
/// checking for "guard" status for N = 0 is done by caller inside of
/// pathbuilder's scope
const size_t sz = entries.size();

View file

@ -6,10 +6,9 @@
#include <util/common.hpp>
#include <util/fs.hpp>
#include <util/thread/threading.hpp>
#include <util/thread/annotations.hpp>
#include <dht/key.hpp>
#include <absl/base/thread_annotations.h>
#include <set>
#include <utility>
@ -86,28 +85,27 @@ struct llarp_nodedb
ShouldSaveToDisk(llarp_time_t now = 0) const;
bool
Remove(const llarp::RouterID &pk) LOCKS_EXCLUDED(access);
Remove(const llarp::RouterID &pk) EXCLUDES(access);
void
RemoveIf(std::function< bool(const llarp::RouterContact &) > filter)
LOCKS_EXCLUDED(access);
EXCLUDES(access);
void
Clear() LOCKS_EXCLUDED(access);
Clear() EXCLUDES(access);
bool
Get(const llarp::RouterID &pk, llarp::RouterContact &result)
LOCKS_EXCLUDED(access);
Get(const llarp::RouterID &pk, llarp::RouterContact &result) EXCLUDES(access);
bool
Has(const llarp::RouterID &pk) LOCKS_EXCLUDED(access);
Has(const llarp::RouterID &pk) EXCLUDES(access);
std::string
getRCFilePath(const llarp::RouterID &pubkey) const;
/// insert without writing to disk
bool
Insert(const llarp::RouterContact &rc) LOCKS_EXCLUDED(access);
Insert(const llarp::RouterContact &rc) EXCLUDES(access);
/// invokes Insert() asynchronously with an optional completion
/// callback
@ -123,7 +121,7 @@ struct llarp_nodedb
UpdateAsyncIfNewer(llarp::RouterContact rc,
std::shared_ptr< llarp::Logic > l = nullptr,
std::function< void(void) > completionHandler = nullptr)
LOCKS_EXCLUDED(access);
EXCLUDES(access);
ssize_t
Load(const fs::path &path);
@ -135,11 +133,11 @@ struct llarp_nodedb
AsyncFlushToDisk();
bool
loadfile(const fs::path &fpath) LOCKS_EXCLUDED(access);
loadfile(const fs::path &fpath) EXCLUDES(access);
void
visit(std::function< bool(const llarp::RouterContact &) > visit)
LOCKS_EXCLUDED(access);
EXCLUDES(access);
void
set_dir(const char *dir);
@ -153,32 +151,31 @@ struct llarp_nodedb
/// visit all entries inserted into nodedb cache before a timestamp
void
VisitInsertedBefore(std::function< void(const llarp::RouterContact &) > visit,
llarp_time_t insertedAfter) LOCKS_EXCLUDED(access);
llarp_time_t insertedAfter) EXCLUDES(access);
void
RemoveStaleRCs(const std::set< llarp::RouterID > &keep, llarp_time_t cutoff);
size_t
num_loaded() const LOCKS_EXCLUDED(access);
num_loaded() const EXCLUDES(access);
bool
select_random_exit(llarp::RouterContact &rc) LOCKS_EXCLUDED(access);
select_random_exit(llarp::RouterContact &rc) EXCLUDES(access);
bool
select_random_hop(const llarp::RouterContact &prev,
llarp::RouterContact &result, size_t N)
LOCKS_EXCLUDED(access);
llarp::RouterContact &result, size_t N) EXCLUDES(access);
bool
select_random_hop_excluding(llarp::RouterContact &result,
const std::set< llarp::RouterID > &exclude)
LOCKS_EXCLUDED(access);
EXCLUDES(access);
static bool
ensure_dir(const char *dir);
void
SaveAll() LOCKS_EXCLUDED(access);
SaveAll() EXCLUDES(access);
};
/// struct for async rc verification

View file

@ -105,7 +105,7 @@ namespace llarp
HopHandler_ptr
MapGet(Map_t& map, const Key_t& k, CheckValue_t check, GetFunc_t get)
{
Lock_t lock(&map.first);
Lock_t lock(map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -120,7 +120,7 @@ namespace llarp
bool
MapHas(Map_t& map, const Key_t& k, CheckValue_t check)
{
Lock_t lock(&map.first);
Lock_t lock(map.first);
auto range = map.second.equal_range(k);
for(auto i = range.first; i != range.second; ++i)
{
@ -135,7 +135,7 @@ namespace llarp
void
MapPut(Map_t& map, const Key_t& k, const Value_t& v)
{
Lock_t lock(&map.first);
Lock_t lock(map.first);
map.second.emplace(k, v);
}
@ -168,8 +168,8 @@ namespace llarp
PathContext::AddOwnPath(PathSet_ptr set, Path_ptr path)
{
set->AddPath(path);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->TXID(), path);
MapPut< SyncOwnedPathsMap_t::Lock_t >(m_OurPaths, path->RXID(), path);
MapPut< util::Lock >(m_OurPaths, path->TXID(), path);
MapPut< util::Lock >(m_OurPaths, path->RXID(), path);
}
bool
@ -185,7 +185,7 @@ namespace llarp
HopHandler_ptr
PathContext::GetByUpstream(const RouterID& remote, const PathID_t& id)
{
auto own = MapGet< SyncOwnedPathsMap_t::Lock_t >(
auto own = MapGet< util::Lock >(
m_OurPaths, id,
[](const Path_ptr) -> bool {
// TODO: is this right?
@ -209,7 +209,7 @@ namespace llarp
PathContext::TransitHopPreviousIsRouter(const PathID_t& path,
const RouterID& otherRouter)
{
SyncTransitMap_t::Lock_t lock(&m_TransitPaths.first);
SyncTransitMap_t::Lock_t lock(m_TransitPaths.first);
auto itr = m_TransitPaths.second.find(path);
if(itr == m_TransitPaths.second.end())
return false;
@ -233,7 +233,7 @@ namespace llarp
PathContext::GetLocalPathSet(const PathID_t& id)
{
auto& map = m_OurPaths;
SyncOwnedPathsMap_t::Lock_t lock(&map.first);
util::Lock lock(map.first);
auto itr = map.second.find(id);
if(itr != map.second.end())
{
@ -260,7 +260,7 @@ namespace llarp
const RouterID us(OurRouterID());
auto& map = m_TransitPaths;
{
SyncTransitMap_t::Lock_t lock(&map.first);
SyncTransitMap_t::Lock_t lock(map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{
@ -300,7 +300,7 @@ namespace llarp
m_PathLimits.Decay(now);
{
SyncTransitMap_t::Lock_t lock(&m_TransitPaths.first);
SyncTransitMap_t::Lock_t lock(m_TransitPaths.first);
auto& map = m_TransitPaths.second;
auto itr = map.begin();
while(itr != map.end())
@ -315,7 +315,7 @@ namespace llarp
}
}
{
SyncOwnedPathsMap_t::Lock_t lock(&m_OurPaths.first);
util::Lock lock(m_OurPaths.first);
auto& map = m_OurPaths.second;
auto itr = map.begin();
while(itr != map.end())
@ -344,7 +344,7 @@ namespace llarp
const RouterID us(OurRouterID());
auto& map = m_TransitPaths;
{
SyncTransitMap_t::Lock_t lock(&map.first);
SyncTransitMap_t::Lock_t lock(map.first);
auto range = map.second.equal_range(id);
for(auto i = range.first; i != range.second; ++i)
{

View file

@ -122,9 +122,9 @@ namespace llarp
void
ForEach(std::function< void(const TransitHop_ptr&) > visit)
LOCKS_EXCLUDED(first)
EXCLUDES(first)
{
Lock_t lock(&first);
Lock_t lock(first);
for(const auto& item : second)
visit(item.second);
}
@ -136,15 +136,13 @@ namespace llarp
struct SyncOwnedPathsMap_t
{
using Mutex_t = util::Mutex;
using Lock_t = util::Lock;
Mutex_t first; // protects second
util::Mutex first; // protects second
OwnedPathsMap_t second GUARDED_BY(first);
void
ForEach(std::function< void(const Path_ptr&) > visit)
{
Lock_t lock(&first);
util::Lock lock(first);
for(const auto& item : second)
visit(item.second);
}

View file

@ -1 +0,0 @@
#include <path/path_types.hpp>

View file

@ -7,7 +7,6 @@
#include <profiling.hpp>
#include <router/abstractrouter.hpp>
#include <util/buffer.hpp>
#include <util/meta/memfn.hpp>
#include <util/thread/logic.hpp>
#include <functional>

View file

@ -27,7 +27,7 @@ namespace llarp
bool
PathSet::ShouldBuildMoreForRoles(llarp_time_t now, PathRole roles) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
const size_t required = MinRequiredForRoles(roles);
size_t has = 0;
for(const auto& item : m_Paths)
@ -52,7 +52,7 @@ namespace llarp
PathSet::NumPathsExistingAt(llarp_time_t futureTime) const
{
size_t num = 0;
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
for(const auto& item : m_Paths)
{
if(item.second->IsReady() && !item.second->Expired(futureTime))
@ -65,7 +65,7 @@ namespace llarp
PathSet::TickPaths(AbstractRouter* r)
{
const auto now = llarp::time_now_ms();
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
for(auto& item : m_Paths)
{
item.second->Tick(now, r);
@ -75,7 +75,7 @@ namespace llarp
void
PathSet::ExpirePaths(llarp_time_t now, AbstractRouter* router)
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
if(m_Paths.size() == 0)
return;
auto itr = m_Paths.begin();
@ -95,7 +95,7 @@ namespace llarp
Path_ptr
PathSet::GetEstablishedPathClosestTo(RouterID id, PathRole roles) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
Path_ptr path = nullptr;
AlignedBuffer< 32 > dist;
AlignedBuffer< 32 > to = id;
@ -119,7 +119,7 @@ namespace llarp
Path_ptr
PathSet::GetNewestPathByRouter(RouterID id, PathRole roles) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
Path_ptr chosen = nullptr;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -142,7 +142,7 @@ namespace llarp
Path_ptr
PathSet::GetPathByRouter(RouterID id, PathRole roles) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
Path_ptr chosen = nullptr;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -165,7 +165,7 @@ namespace llarp
Path_ptr
PathSet::GetByEndpointWithID(RouterID ep, PathID_t id) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -181,7 +181,7 @@ namespace llarp
Path_ptr
PathSet::GetPathByID(PathID_t id) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -195,7 +195,7 @@ namespace llarp
size_t
PathSet::AvailablePaths(PathRole roles) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
size_t count = 0;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -211,7 +211,7 @@ namespace llarp
size_t
PathSet::NumInStatus(PathStatus st) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
size_t count = 0;
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
@ -226,7 +226,7 @@ namespace llarp
void
PathSet::AddPath(Path_ptr path)
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
const auto upstream = path->Upstream(); // RouterID
const auto RXID = path->RXID(); // PathID
if(not m_Paths.emplace(std::make_pair(upstream, RXID), path).second)
@ -240,14 +240,14 @@ namespace llarp
void
PathSet::RemovePath(Path_ptr path)
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
m_Paths.erase({path->Upstream(), path->RXID()});
}
Path_ptr
PathSet::GetByUpstream(RouterID remote, PathID_t rxid) const
{
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.find({remote, rxid});
if(itr == m_Paths.end())
return nullptr;
@ -261,7 +261,7 @@ namespace llarp
{
intros.clear();
size_t count = 0;
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -281,7 +281,7 @@ namespace llarp
{
intros.clear();
size_t count = 0;
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -350,7 +350,7 @@ namespace llarp
{
intro.Clear();
bool found = false;
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{
@ -369,7 +369,7 @@ namespace llarp
PathSet::PickRandomEstablishedPath(PathRole roles) const
{
std::vector< Path_ptr > established;
Lock_t l(&m_PathsMutex);
Lock_t l(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{

View file

@ -266,7 +266,7 @@ namespace llarp
void
ForEachPath(std::function< void(const Path_ptr&) > visit) const
{
Lock_t lock(&m_PathsMutex);
Lock_t lock(m_PathsMutex);
auto itr = m_Paths.begin();
while(itr != m_Paths.end())
{

View file

@ -118,7 +118,7 @@ namespace llarp
{
if(m_DisableProfiling.load())
return false;
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
auto itr = m_Profiles.find(r);
if(itr == m_Profiles.end())
return false;
@ -130,7 +130,7 @@ namespace llarp
{
if(m_DisableProfiling.load())
return false;
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
auto itr = m_Profiles.find(r);
if(itr == m_Profiles.end())
return false;
@ -142,7 +142,7 @@ namespace llarp
{
if(m_DisableProfiling.load())
return false;
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
auto itr = m_Profiles.find(r);
if(itr == m_Profiles.end())
return false;
@ -152,7 +152,7 @@ namespace llarp
void
Profiling::Tick()
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
std::for_each(m_Profiles.begin(), m_Profiles.end(),
[](auto& item) { item.second.Tick(); });
}
@ -160,7 +160,7 @@ namespace llarp
void
Profiling::MarkConnectTimeout(const RouterID& r)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
m_Profiles[r].connectTimeoutCount += 1;
m_Profiles[r].lastUpdated = llarp::time_now_ms();
}
@ -168,7 +168,7 @@ namespace llarp
void
Profiling::MarkConnectSuccess(const RouterID& r)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
m_Profiles[r].connectGoodCount += 1;
m_Profiles[r].lastUpdated = llarp::time_now_ms();
}
@ -176,14 +176,14 @@ namespace llarp
void
Profiling::ClearProfile(const RouterID& r)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
m_Profiles.erase(r);
}
void
Profiling::MarkHopFail(const RouterID& r)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
m_Profiles[r].pathFailCount += 1;
m_Profiles[r].lastUpdated = llarp::time_now_ms();
}
@ -191,7 +191,7 @@ namespace llarp
void
Profiling::MarkPathFail(path::Path* p)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
size_t idx = 0;
for(const auto& hop : p->hops)
{
@ -208,7 +208,7 @@ namespace llarp
void
Profiling::MarkPathSuccess(path::Path* p)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
const auto sz = p->hops.size();
for(const auto& hop : p->hops)
{
@ -220,7 +220,7 @@ namespace llarp
bool
Profiling::Save(const char* fname)
{
absl::ReaderMutexLock lock(&m_ProfilesMutex);
auto lock = util::shared_lock(m_ProfilesMutex);
size_t sz = (m_Profiles.size() * (RouterProfile::MaxSize + 32 + 8)) + 8;
std::vector< byte_t > tmp(sz, 0);
@ -247,7 +247,7 @@ namespace llarp
bool
Profiling::BEncode(llarp_buffer_t* buf) const
{
absl::ReaderMutexLock lock(&m_ProfilesMutex);
auto lock = util::shared_lock(m_ProfilesMutex);
return BEncodeNoLock(buf);
}
@ -284,7 +284,7 @@ namespace llarp
bool
Profiling::Load(const char* fname)
{
lock_t lock(&m_ProfilesMutex);
util::Lock lock(m_ProfilesMutex);
m_Profiles.clear();
if(!BDecodeReadFromFile(fname, *this))
{

View file

@ -52,8 +52,7 @@ namespace llarp
/// generic variant
bool
IsBad(const RouterID& r, uint64_t chances = 8)
LOCKS_EXCLUDED(m_ProfilesMutex);
IsBad(const RouterID& r, uint64_t chances = 8) EXCLUDES(m_ProfilesMutex);
/// check if this router should have paths built over it
bool
@ -63,31 +62,31 @@ namespace llarp
/// check if this router should be connected directly to
bool
IsBadForConnect(const RouterID& r, uint64_t chances = 8)
LOCKS_EXCLUDED(m_ProfilesMutex);
EXCLUDES(m_ProfilesMutex);
void
MarkConnectTimeout(const RouterID& r) LOCKS_EXCLUDED(m_ProfilesMutex);
MarkConnectTimeout(const RouterID& r) EXCLUDES(m_ProfilesMutex);
void
MarkConnectSuccess(const RouterID& r) LOCKS_EXCLUDED(m_ProfilesMutex);
MarkConnectSuccess(const RouterID& r) EXCLUDES(m_ProfilesMutex);
void
MarkPathFail(path::Path* p) LOCKS_EXCLUDED(m_ProfilesMutex);
MarkPathFail(path::Path* p) EXCLUDES(m_ProfilesMutex);
void
MarkPathSuccess(path::Path* p) LOCKS_EXCLUDED(m_ProfilesMutex);
MarkPathSuccess(path::Path* p) EXCLUDES(m_ProfilesMutex);
void
MarkHopFail(const RouterID& r) LOCKS_EXCLUDED(m_ProfilesMutex);
MarkHopFail(const RouterID& r) EXCLUDES(m_ProfilesMutex);
void
ClearProfile(const RouterID& r) LOCKS_EXCLUDED(m_ProfilesMutex);
ClearProfile(const RouterID& r) EXCLUDES(m_ProfilesMutex);
void
Tick() LOCKS_EXCLUDED(m_ProfilesMutex);
Tick() EXCLUDES(m_ProfilesMutex);
bool
BEncode(llarp_buffer_t* buf) const LOCKS_EXCLUDED(m_ProfilesMutex);
BEncode(llarp_buffer_t* buf) const EXCLUDES(m_ProfilesMutex);
bool
DecodeKey(const llarp_buffer_t& k,
@ -95,10 +94,10 @@ namespace llarp
// disabled because we do load -> bencode::BDecodeReadFromFile -> DecodeKey
bool
Load(const char* fname) LOCKS_EXCLUDED(m_ProfilesMutex);
Load(const char* fname) EXCLUDES(m_ProfilesMutex);
bool
Save(const char* fname) LOCKS_EXCLUDED(m_ProfilesMutex);
Save(const char* fname) EXCLUDES(m_ProfilesMutex);
bool
ShouldSave(llarp_time_t now) const;
@ -111,9 +110,7 @@ namespace llarp
private:
bool
BEncodeNoLock(llarp_buffer_t* buf) const
SHARED_LOCKS_REQUIRED(m_ProfilesMutex);
using lock_t = util::Lock;
BEncodeNoLock(llarp_buffer_t* buf) const REQUIRES_SHARED(m_ProfilesMutex);
mutable util::Mutex m_ProfilesMutex; // protects m_Profiles
std::map< RouterID, RouterProfile > m_Profiles GUARDED_BY(m_ProfilesMutex);
llarp_time_t m_LastSave = 0;

View file

@ -1 +0,0 @@
#include <router/abstractrouter.hpp>

View file

@ -1 +0,0 @@
#include <router/i_gossiper.hpp>

View file

@ -1 +0,0 @@
#include <router/i_outbound_message_handler.hpp>

View file

@ -1 +0,0 @@
#include <router/i_outbound_session_maker.hpp>

View file

@ -1 +0,0 @@
#include <router/i_rc_lookup_handler.hpp>

View file

@ -47,7 +47,7 @@ namespace llarp
bool shouldCreateSession = false;
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
// create queue for <remote> if it doesn't exist, and get iterator
auto itr_pair =
@ -404,7 +404,7 @@ namespace llarp
{
MessageQueue movedMessages;
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
auto itr = pendingSessionMessageQueues.find(router);
if(itr == pendingSessionMessageQueues.end())

View file

@ -30,7 +30,7 @@ namespace llarp
bool
QueueMessage(const RouterID &remote, const ILinkMessage *msg,
SendStatusHandler callback) override LOCKS_EXCLUDED(_mutex);
SendStatusHandler callback) override EXCLUDES(_mutex);
void
Tick() override;
@ -122,7 +122,7 @@ namespace llarp
void
FinalizeSessionRequest(const RouterID &router, SendStatus status)
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
llarp::thread::Queue< MessageQueueEntry > outboundQueue;
llarp::thread::Queue< PathID_t > removedPaths;

View file

@ -68,7 +68,7 @@ namespace llarp
{
if(on_result)
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
@ -94,7 +94,7 @@ namespace llarp
{
if(on_result)
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
auto itr_pair = pendingCallbacks.emplace(rc.pubkey, CallbacksQueue{});
itr_pair.first->second.push_back(on_result);
@ -112,7 +112,7 @@ namespace llarp
bool
OutboundSessionMaker::HavePendingSessionTo(const RouterID &router) const
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
return pendingSessions.find(router) != pendingSessions.end();
}
@ -169,7 +169,7 @@ namespace llarp
void
OutboundSessionMaker::DoEstablish(const RouterID &router)
{
util::ReleasableLock l(&_mutex);
auto l = util::unique_lock(_mutex);
auto itr = pendingSessions.find(router);
@ -183,7 +183,7 @@ namespace llarp
{
// TODO: maybe different failure type?
l.Release();
l.unlock();
FinalizeRequest(router, SessionResult::NoLink);
}
}
@ -193,7 +193,7 @@ namespace llarp
const RouterContact &rc)
{
{
util::ReleasableLock l(&_mutex);
auto l = util::unique_lock(_mutex);
// in case other request found RC for this router after this request was
// made
@ -207,7 +207,7 @@ namespace llarp
if(!link)
{
l.Release();
l.unlock();
FinalizeRequest(router, SessionResult::NoLink);
return;
}
@ -230,7 +230,7 @@ namespace llarp
return false;
size_t numPending = 0;
{
util::Lock lock(&_mutex);
util::Lock lock(_mutex);
if(pendingSessions.find(router) == pendingSessions.end())
numPending += pendingSessions.size();
}
@ -300,7 +300,7 @@ namespace llarp
void
OutboundSessionMaker::CreatePendingSession(const RouterID &router)
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
pendingSessions.emplace(router, nullptr);
}
@ -310,7 +310,7 @@ namespace llarp
{
CallbacksQueue movedCallbacks;
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
if(type == SessionResult::Establish)
{
@ -338,7 +338,7 @@ namespace llarp
}
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
pendingSessions.erase(router);
}
}

View file

@ -38,15 +38,15 @@ namespace llarp
void
CreateSessionTo(const RouterID &router, RouterCallback on_result) override
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
void
CreateSessionTo(const RouterContact &rc, RouterCallback on_result) override
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
bool
HavePendingSessionTo(const RouterID &router) const override
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
void
ConnectToRandomRouters(int numDesired) override;
@ -55,8 +55,7 @@ namespace llarp
ExtractStatus() const override;
bool
ShouldConnectTo(const RouterID &router) const override
LOCKS_EXCLUDED(_mutex);
ShouldConnectTo(const RouterID &router) const override EXCLUDES(_mutex);
void
Init(ILinkManager *linkManager, I_RCLookupHandler *rcLookup,
@ -77,11 +76,11 @@ namespace llarp
private:
void
DoEstablish(const RouterID &router) LOCKS_EXCLUDED(_mutex);
DoEstablish(const RouterID &router) EXCLUDES(_mutex);
void
GotRouterContact(const RouterID &router, const RouterContact &rc)
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
void
InvalidRouter(const RouterID &router);
@ -97,11 +96,11 @@ namespace llarp
VerifyRC(const RouterContact rc);
void
CreatePendingSession(const RouterID &router) LOCKS_EXCLUDED(_mutex);
CreatePendingSession(const RouterID &router) EXCLUDES(_mutex);
void
FinalizeRequest(const RouterID &router, const SessionResult type)
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
mutable util::Mutex _mutex; // protects pendingSessions, pendingCallbacks

View file

@ -6,7 +6,6 @@
#include <crypto/crypto.hpp>
#include <service/context.hpp>
#include <router_contact.hpp>
#include <util/meta/memfn.hpp>
#include <util/types.hpp>
#include <util/thread/threading.hpp>
#include <nodedb.hpp>
@ -24,14 +23,14 @@ namespace llarp
void
RCLookupHandler::AddValidRouter(const RouterID &router)
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
whitelistRouters.insert(router);
}
void
RCLookupHandler::RemoveValidRouter(const RouterID &router)
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
whitelistRouters.erase(router);
}
@ -40,7 +39,7 @@ namespace llarp
{
if(routers.empty())
return;
util::Lock l(&_mutex);
util::Lock l(_mutex);
whitelistRouters.clear();
for(auto &router : routers)
@ -55,7 +54,7 @@ namespace llarp
bool
RCLookupHandler::HaveReceivedWhitelist()
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
return not whitelistRouters.empty();
}
@ -79,7 +78,7 @@ namespace llarp
bool shouldDoLookup = false;
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
auto itr_pair = pendingCallbacks.emplace(router, CallbacksQueue{});
@ -132,7 +131,7 @@ namespace llarp
return false;
}
util::Lock l(&_mutex);
util::Lock l(_mutex);
if(useWhitelist && whitelistRouters.find(remote) == whitelistRouters.end())
{
@ -178,7 +177,7 @@ namespace llarp
bool
RCLookupHandler::GetRandomWhitelistRouter(RouterID &router) const
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
const auto sz = whitelistRouters.size();
auto itr = whitelistRouters.begin();
@ -266,7 +265,7 @@ namespace llarp
{
// if we are using a whitelist look up a few routers we don't have
util::Lock l(&_mutex);
util::Lock l(_mutex);
for(const auto &r : whitelistRouters)
{
if(now > _routerLookupTimes[r] + RerequestInterval
@ -359,7 +358,7 @@ namespace llarp
bool
RCLookupHandler::HavePendingLookup(RouterID remote) const
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
return pendingCallbacks.find(remote) != pendingCallbacks.end();
}
@ -383,7 +382,7 @@ namespace llarp
{
CallbacksQueue movedCallbacks;
{
util::Lock l(&_mutex);
util::Lock l(_mutex);
auto itr = pendingCallbacks.find(router);

View file

@ -32,32 +32,30 @@ namespace llarp
~RCLookupHandler() override = default;
void
AddValidRouter(const RouterID &router) override LOCKS_EXCLUDED(_mutex);
AddValidRouter(const RouterID &router) override EXCLUDES(_mutex);
void
RemoveValidRouter(const RouterID &router) override LOCKS_EXCLUDED(_mutex);
RemoveValidRouter(const RouterID &router) override EXCLUDES(_mutex);
void
SetRouterWhitelist(const std::vector< RouterID > &routers) override
LOCKS_EXCLUDED(_mutex);
EXCLUDES(_mutex);
bool
HaveReceivedWhitelist();
void
GetRC(const RouterID &router, RCRequestCallback callback,
bool forceLookup = false) override LOCKS_EXCLUDED(_mutex);
bool forceLookup = false) override EXCLUDES(_mutex);
bool
RemoteIsAllowed(const RouterID &remote) const override
LOCKS_EXCLUDED(_mutex);
RemoteIsAllowed(const RouterID &remote) const override EXCLUDES(_mutex);
bool
CheckRC(const RouterContact &rc) const override;
bool
GetRandomWhitelistRouter(RouterID &router) const override
LOCKS_EXCLUDED(_mutex);
GetRandomWhitelistRouter(RouterID &router) const override EXCLUDES(_mutex);
bool
CheckRenegotiateValid(RouterContact newrc, RouterContact oldrc) override;
@ -85,14 +83,14 @@ namespace llarp
const std::vector< RouterContact > &results);
bool
HavePendingLookup(RouterID remote) const LOCKS_EXCLUDED(_mutex);
HavePendingLookup(RouterID remote) const EXCLUDES(_mutex);
bool
RemoteInBootstrap(const RouterID &remote) const;
void
FinalizeRequest(const RouterID &router, const RouterContact *const rc,
RCRequestResult result) LOCKS_EXCLUDED(_mutex);
RCRequestResult result) EXCLUDES(_mutex);
mutable util::Mutex _mutex; // protects pendingCallbacks, whitelistRouters

View file

@ -20,7 +20,6 @@
#include <util/logging/logger_syslog.hpp>
#include <util/logging/logger.hpp>
#include <util/meta/memfn.hpp>
#include <util/metrics/metrics.hpp>
#include <util/str.hpp>
#include <ev/ev.hpp>
@ -141,7 +140,7 @@ namespace llarp
return true;
};
absl::ReaderMutexLock l(&nodedb()->access);
auto l = util::shared_lock(nodedb()->access);
return pick_router(nodedb()->entries);
}
@ -780,7 +779,6 @@ namespace llarp
bool
Router::Sign(Signature &sig, const llarp_buffer_t &buf) const
{
metrics::TimerGuard t("Router", "Sign");
return CryptoManager::instance()->sign(sig, identity(), buf);
}

View file

@ -1 +0,0 @@
#include <routing/handler.hpp>

View file

@ -1 +0,0 @@
#include <routing/message.hpp>

View file

@ -853,7 +853,7 @@ namespace llarp
{
if(msg->proto == eProtocolTrafficV4 || msg->proto == eProtocolTrafficV6)
{
util::Lock l(&m_state->m_InboundTrafficQueueMutex);
util::Lock l(m_state->m_InboundTrafficQueueMutex);
m_state->m_InboundTrafficQueue.emplace(msg);
return true;
}
@ -903,7 +903,7 @@ namespace llarp
return false;
{
LogWarn("invalidating convotag T=", frame.T);
util::Lock lock(&m_state->m_SendQueueMutex);
util::Lock lock(m_state->m_SendQueueMutex);
m_state->m_SendQueue.emplace_back(
std::make_shared< const routing::PathTransferMessage >(f,
frame.F),
@ -1080,7 +1080,7 @@ namespace llarp
for(const auto& item : sessions)
item.second.first->FlushDownstream();
// send downstream traffic to user for hidden service
util::Lock lock(&m_state->m_InboundTrafficQueueMutex);
util::Lock lock(m_state->m_InboundTrafficQueueMutex);
while(not queue.empty())
{
const auto& msg = queue.top();
@ -1106,7 +1106,7 @@ namespace llarp
for(const auto& item : sessions)
item.second.first->FlushUpstream();
{
util::Lock lock(&m_state->m_SendQueueMutex);
util::Lock lock(m_state->m_SendQueueMutex);
// send outbound traffic
for(const auto& item : m_state->m_SendQueue)
{
@ -1206,7 +1206,7 @@ namespace llarp
return;
}
util::Lock lock(&self->m_state->m_SendQueueMutex);
util::Lock lock(self->m_state->m_SendQueueMutex);
self->m_state->m_SendQueue.emplace_back(transfer, p);
});
}

View file

@ -1 +0,0 @@
#include <service/handler.hpp>

View file

@ -1 +0,0 @@
#include <service/pendingbuffer.hpp>

View file

@ -1 +0,0 @@
#include <service/vanity.hpp>

View file

@ -1 +0,0 @@
#include <util/aligned.hpp>

View file

@ -1 +0,0 @@
#include <util/bits.hpp>

View file

@ -1 +0,0 @@
#include <util/codel.hpp>

View file

@ -28,8 +28,9 @@ namespace llarp
template < typename T, typename GetTime, typename PutTime, typename Compare,
typename GetNow = GetNowSyscall, typename Mutex_t = util::Mutex,
typename Lock_t = util::Lock, llarp_time_t dropMs = 5,
llarp_time_t initialIntervalMs = 100, size_t MaxSize = 1024 >
typename Lock_t = std::lock_guard< Mutex_t >,
llarp_time_t dropMs = 5, llarp_time_t initialIntervalMs = 100,
size_t MaxSize = 1024 >
struct CoDelQueue
{
CoDelQueue(std::string name, PutTime put, GetNow now)
@ -41,7 +42,7 @@ namespace llarp
}
size_t
Size() LOCKS_EXCLUDED(m_QueueMutex)
Size() EXCLUDES(m_QueueMutex)
{
Lock_t lock(m_QueueMutex);
return m_QueueIdx;
@ -50,9 +51,9 @@ namespace llarp
template < typename... Args >
bool
EmplaceIf(std::function< bool(T&) > pred, Args&&... args)
LOCKS_EXCLUDED(m_QueueMutex)
EXCLUDES(m_QueueMutex)
{
Lock_t lock(&m_QueueMutex);
Lock_t lock(m_QueueMutex);
if(m_QueueIdx == MaxSize)
return false;
T* t = &m_Queue[m_QueueIdx];
@ -73,9 +74,9 @@ namespace llarp
template < typename... Args >
void
Emplace(Args&&... args) LOCKS_EXCLUDED(m_QueueMutex)
Emplace(Args&&... args) EXCLUDES(m_QueueMutex)
{
Lock_t lock(&m_QueueMutex);
Lock_t lock(m_QueueMutex);
if(m_QueueIdx == MaxSize)
return;
T* t = &m_Queue[m_QueueIdx];
@ -95,13 +96,13 @@ namespace llarp
template < typename Visit, typename Filter >
void
Process(Visit visitor, Filter f) LOCKS_EXCLUDED(m_QueueMutex)
Process(Visit visitor, Filter f) EXCLUDES(m_QueueMutex)
{
llarp_time_t lowest = std::numeric_limits< llarp_time_t >::max();
if(_getNow() < nextTickAt)
return;
// llarp::LogInfo("CoDelQueue::Process - start at ", start);
Lock_t lock(&m_QueueMutex);
Lock_t lock(m_QueueMutex);
auto start = firstPut;
if(m_QueueIdx == 1)

View file

@ -1 +0,0 @@
#include <util/common.hpp>

View file

@ -1 +0,0 @@
#include <util/decaying_hashset.hpp>

View file

@ -1 +0,0 @@
#include <util/endian.hpp>

View file

@ -1 +0,0 @@
#include <util/meta/memfn.hpp>

View file

@ -1,88 +1,68 @@
#ifndef LLARP_UTIL_MEMFN
#define LLARP_UTIL_MEMFN
#include <util/meta/memfn_traits.hpp>
#include <util/meta/object.hpp>
#include <util/meta/traits.hpp>
#include <functional>
#include <type_traits>
#include <utility>
#include <memory>
namespace llarp
{
namespace util
{
template < typename Obj >
struct MemFnDereference
// Wraps a member function and instance into a callable object that invokes
// the method (non-const overload).
template <
typename Return, typename Class, typename Derived, typename... Arg,
typename =
std::enable_if_t< std::is_base_of< Class, Derived >::value > >
auto
memFn(Return (Class::*f)(Arg...), Derived* self)
{
// clang-format off
static inline Obj& derefImp(Obj& obj, std::false_type)
{
return obj;
}
return [f, self](Arg... args) -> Return {
return (self->*f)(std::forward< Arg >(args)...);
};
}
template < typename Type >
static inline Obj& derefImp(Type& obj, std::true_type)
{
return *obj;
}
template < typename Type >
static inline Obj& derefImp(const Type& obj, std::true_type)
{
return *obj;
}
template < typename Type >
static inline Obj& deref(Type& obj)
{
return derefImp(obj, traits::is_pointy< Type >());
}
template < typename Type >
static inline Obj& deref(const Type& obj)
{
return derefImp(obj, traits::is_pointy< Type >());
}
// clang-format on
};
template < typename Prototype, typename Instance >
class MemFn
// Wraps a member function and instance into a lambda that invokes the
// method (const overload).
template <
typename Return, typename Class, typename Derived, typename... Arg,
typename =
std::enable_if_t< std::is_base_of< Class, Derived >::value > >
auto
memFn(Return (Class::*f)(Arg...) const, const Derived* self)
{
using traits = MemFnTraits< Prototype >;
return [f, self](Arg... args) -> Return {
return (self->*f)(std::forward< Arg >(args)...);
};
}
static_assert(traits::IsMemFn, "");
public:
using result_type = typename traits::result_type;
private:
Prototype m_func;
object::Proxy< Instance > m_instance;
using Deref = MemFnDereference< typename traits::class_type >;
public:
MemFn(Prototype prototype, const Instance& instance)
: m_func(prototype), m_instance(instance)
{
}
template < typename... Args >
result_type
operator()(Args&&... args) const
{
return (Deref::deref(m_instance.value())
.*m_func)(std::forward< Args >(args)...);
}
};
template < typename Prototype, typename Instance >
MemFn< Prototype, Instance >
memFn(Prototype prototype, const Instance& instance)
// Wraps a member function and shared pointer to an instance into a lambda
// that invokes the method.
template <
typename Return, typename Class, typename Derived, typename... Arg,
typename =
std::enable_if_t< std::is_base_of< Class, Derived >::value > >
auto
memFn(Return (Class::*f)(Arg...), std::shared_ptr< Derived > self)
{
return MemFn< Prototype, Instance >(prototype, instance);
return [f, self = std::move(self)](Arg... args) -> Return {
return (self.get()->*f)(std::forward< Arg >(args)...);
};
}
// Wraps a member function and shared pointer to an instance into a lambda
// that invokes the method (const method overload).
template <
typename Return, typename Class, typename Derived, typename... Arg,
typename =
std::enable_if_t< std::is_base_of< Class, Derived >::value > >
auto
memFn(Return (Class::*f)(Arg...) const, std::shared_ptr< Derived > self)
{
return [f, self = std::move(self)](Arg... args) -> Return {
return (self.get()->*f)(std::forward< Arg >(args)...);
};
}
} // namespace util

View file

@ -1 +0,0 @@
#include <util/meta/memfn_traits.hpp>

View file

@ -1,107 +0,0 @@
#ifndef LLARP_UTIL_MEMFN_TRAITS
#define LLARP_UTIL_MEMFN_TRAITS
#include <util/meta/object.hpp>
#include <util/meta/traits.hpp>
#include <functional>
#include <utility>
namespace llarp
{
namespace util
{
template < typename Prototype, typename TestPrototype >
struct MemFnTraitsImpl;
template < typename Prototype >
struct MemFnTraits : public MemFnTraitsImpl< Prototype, Prototype >
{
};
template < typename Prototype, typename Return, typename Type,
typename... Args >
class MemFnTraitsClass
{
using NonCVTag = traits::Tag< 0 >;
using ConstTag = traits::Tag< 1 >;
using VolTag = traits::Tag< 2 >;
using ConstVolTag = traits::Tag< 3 >;
// clang-format off
static constexpr NonCVTag test(Return (Type::*)(Args...));
static constexpr ConstTag test(Return (Type::*)(Args...) const);
static constexpr VolTag test(Return (Type::*)(Args...) volatile);
static constexpr ConstVolTag test(Return (Type::*)(Args...) const volatile);
// clang-format on
public:
static constexpr bool IsConst =
((sizeof((test)((Prototype)0)) - 1) & 1) != 0;
static constexpr bool IsVolatile =
((sizeof((test)((Prototype)0)) - 1) & 2) != 0;
using ctype = std::conditional_t< IsConst, const Type, Type >;
using type = std::conditional_t< IsVolatile, volatile ctype, ctype >;
};
template < typename Prototype, typename Return, typename Type,
typename... Args >
struct MemFnTraitsImpl< Prototype, Return (Type::*)(Args...) >
{
static constexpr bool IsMemFn = true;
using class_type =
typename MemFnTraitsClass< Prototype, Return, Type, Args... >::type;
using result_type = Return;
};
template < typename Prototype, typename Return, typename Type,
typename... Args >
struct MemFnTraitsImpl< Prototype, Return (Type::*)(Args...) const >
{
static constexpr bool IsMemFn = true;
using class_type =
typename MemFnTraitsClass< Prototype, Return, Type, Args... >::type;
using result_type = Return;
};
template < typename Prototype, typename Return, typename Type,
typename... Args >
struct MemFnTraitsImpl< Prototype, Return (Type::*)(Args...) volatile >
{
static constexpr bool IsMemFn = true;
using class_type =
typename MemFnTraitsClass< Prototype, Return, Type, Args... >::type;
using result_type = Return;
};
template < typename Prototype, typename Return, typename Type,
typename... Args >
struct MemFnTraitsImpl< Prototype,
Return (Type::*)(Args...) const volatile >
{
static constexpr bool IsMemFn = true;
using class_type =
typename MemFnTraitsClass< Prototype, Return, Type, Args... >::type;
using result_type = Return;
};
template < typename Prototype, typename TestPrototype >
struct MemFnTraitsImpl
{
static constexpr bool IsMemFn = false;
using result_type = void;
using class_type = void;
};
} // namespace util
} // namespace llarp
#endif

View file

@ -1 +0,0 @@
#include <util/meta/object.hpp>

View file

@ -1,469 +0,0 @@
#ifndef LLARP_OBJECT_HPP
#define LLARP_OBJECT_HPP
#include <util/thread/threading.hpp>
#include <nonstd/optional.hpp>
#include <vector>
namespace llarp
{
namespace object
{
/// Provide a buffer, capable of holding a single `Value` object.
/// This is useful for node-based data structures.
/// Note:
/// - This union explicitly does *not* manage the lifetime of the object,
/// explicit calls to the constructor/destructor must be made.
template < typename Value >
union Buffer {
private:
char m_buffer[sizeof(Value)];
char m_align[alignof(Value)];
public:
Value*
address()
{
return reinterpret_cast< Value* >(static_cast< void* >(m_buffer));
}
const Value*
address() const
{
return reinterpret_cast< Value* >(static_cast< void* >(m_buffer));
}
char*
buffer()
{
return m_buffer;
}
const char*
buffer() const
{
return m_buffer;
}
Value&
value()
{
return *reinterpret_cast< Value* >(this);
}
const Value&
value() const
{
return *reinterpret_cast< const Value* >(this);
}
};
template < typename Value >
class Proxy
{
Buffer< Value > m_value;
Proxy&
operator=(const Proxy&) = delete;
public:
Proxy()
{
::new(m_value.buffer()) Value();
}
Proxy(const Proxy& other)
{
::new(m_value.buffer()) Value(other.value());
}
Proxy(const Value& value)
{
::new(m_value.buffer()) Value(value);
}
// template < typename... Args >
// Proxy(Args&&... args)
// {
// ::new(m_value.buffer()) Value(std::forward< Args >(args)...);
// }
~Proxy()
{
m_value.address()->~Value();
}
Value&
value()
{
return m_value.value();
}
const Value&
value() const
{
return m_value.value();
}
};
template < typename Value >
class Catalog;
template < typename Value >
class CatalogIterator;
template < typename Value >
class CatalogCleaner
{
Catalog< Value >* m_catalog;
typename Catalog< Value >::Node* m_node;
bool m_shouldDelete;
CatalogCleaner(const CatalogCleaner&) = delete;
CatalogCleaner&
operator=(const CatalogCleaner&) = delete;
public:
explicit CatalogCleaner(Catalog< Value >* catalog)
: m_catalog(catalog), m_node(nullptr), m_shouldDelete(false)
{
}
~CatalogCleaner();
void
manageNode(typename Catalog< Value >::Node* node, bool shouldDelete)
{
m_node = node;
m_shouldDelete = shouldDelete;
}
void
releaseNode()
{
m_node = nullptr;
}
void
release()
{
releaseNode();
m_catalog = nullptr;
}
};
/// A pooling catalog of objects, referred to by a 32-bit handle
template < typename Value >
class Catalog
{
enum
{
INDEX_MASK = 0X007FFFFF,
BUSY_INDICATOR = 0x00800000,
GENERATION_INC = 0x01000000,
GENERATION_MASK = 0XFF000000
};
struct Node
{
union Payload {
Buffer< Value > m_buffer;
Node* m_next;
};
Payload m_payload;
int32_t m_handle;
};
std::vector< Node* > m_nodes GUARDED_BY(m_mutex);
Node* m_next;
std::atomic_size_t m_size;
mutable util::Mutex m_mutex;
friend class CatalogCleaner< Value >;
friend class CatalogIterator< Value >;
static Value*
getValue(Node* node)
{
return node->m_payload.m_buffer.address();
}
void
freeNode(Node* node)
{
node->m_handle += GENERATION_INC;
node->m_handle &= ~BUSY_INDICATOR;
node->m_payload.m_next = m_next;
m_next = node;
}
Node*
findNode(int32_t handle) const SHARED_LOCKS_REQUIRED(m_mutex)
{
int32_t index = handle & INDEX_MASK;
if(0 > index || index >= static_cast< int32_t >(m_nodes.size())
|| !(handle & BUSY_INDICATOR))
{
return nullptr;
}
Node* node = m_nodes[index];
return (node->m_handle == handle) ? node : nullptr;
}
public:
Catalog() : m_next(nullptr), m_size(0)
{
}
~Catalog()
{
removeAll();
}
int32_t
add(const Value& value)
{
int32_t handle;
absl::WriterMutexLock l(&m_mutex);
CatalogCleaner< Value > guard(this);
Node* node;
if(m_next)
{
node = m_next;
m_next = node->m_payload.m_next;
guard.manageNode(node, false);
}
else
{
assert(m_nodes.size() < BUSY_INDICATOR);
node = new Node;
guard.manageNode(node, true);
m_nodes.push_back(node);
node->m_handle = static_cast< int32_t >(m_nodes.size() - 1);
guard.manageNode(node, false);
}
node->m_handle |= BUSY_INDICATOR;
handle = node->m_handle;
// construct into the node.
::new(getValue(node)) Value(value);
guard.release();
++m_size;
return handle;
}
bool
remove(int32_t handle, Value* value = nullptr)
{
absl::WriterMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return false;
}
Value* val = getValue(node);
if(value)
{
*value = *val;
}
val->~Value();
freeNode(node);
--m_size;
return true;
}
void
removeAll(std::vector< Value >* output = nullptr)
{
absl::WriterMutexLock l(&m_mutex);
for(Node* node : m_nodes)
{
if(node->m_handle & BUSY_INDICATOR)
{
Value* value = getValue(node);
if(output)
{
output->push_back(*value);
}
value->~Value();
}
delete node;
}
m_nodes.clear();
m_next = nullptr;
m_size = 0;
}
bool
replace(const Value& newValue, int32_t handle)
{
absl::WriterMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return false;
}
Value* value = getValue(node);
value->~Value();
// construct into the node.
::new(value) Value(newValue);
return true;
}
nonstd::optional< Value >
find(int32_t handle)
{
absl::ReaderMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return {};
}
return *getValue(node);
}
size_t
size() const
{
return m_size;
}
/// introduced for testing only. verify the current state of the catalog.
bool
verify() const;
};
template < typename Value >
class SCOPED_LOCKABLE CatalogIterator
{
const Catalog< Value >* m_catalog;
size_t m_index;
CatalogIterator(const CatalogIterator&) = delete;
CatalogIterator&
operator=(const CatalogIterator&) = delete;
public:
explicit CatalogIterator(const Catalog< Value >* catalog)
SHARED_LOCK_FUNCTION(m_catalog->m_mutex)
: m_catalog(catalog), m_index(-1)
{
m_catalog->m_mutex.ReaderLock();
operator++();
}
~CatalogIterator() UNLOCK_FUNCTION()
{
m_catalog->m_mutex.ReaderUnlock();
}
void
operator++() NO_THREAD_SAFETY_ANALYSIS
{
m_index++;
while(m_index < m_catalog->m_nodes.size()
&& !(m_catalog->m_nodes[m_index]->m_handle
& Catalog< Value >::BUSY_INDICATOR))
{
m_index++;
}
}
explicit operator bool() const NO_THREAD_SAFETY_ANALYSIS
{
return m_index < m_catalog->m_nodes.size();
}
std::pair< int32_t, Value >
operator()() const NO_THREAD_SAFETY_ANALYSIS
{
auto* node = m_catalog->m_nodes[m_index];
return {node->m_handle, *Catalog< Value >::getValue(node)};
}
};
template < typename Value >
CatalogCleaner< Value >::~CatalogCleaner()
{
if(m_catalog && m_node)
{
if(m_shouldDelete)
{
// We call the destructor elsewhere.
operator delete(m_node);
}
else
{
m_catalog->freeNode(m_node);
}
}
}
template < typename Value >
bool
Catalog< Value >::verify() const
{
absl::WriterMutexLock l(&m_mutex);
if(m_nodes.size() < m_size)
{
return false;
}
size_t busyCount = 0;
for(size_t i = 0; i < m_nodes.size(); i++)
{
if((m_nodes[i]->m_handle & INDEX_MASK) != i)
{
return false;
}
if(m_nodes[i]->m_handle & BUSY_INDICATOR)
{
busyCount++;
}
}
if(m_size != busyCount)
{
return false;
}
size_t freeCount = 0;
for(Node* p = m_next; p != nullptr; p = p->m_payload.m_next)
{
freeCount++;
}
if(freeCount + busyCount != m_nodes.size())
{
return false;
}
return true;
}
} // namespace object
} // namespace llarp
#endif

View file

@ -1 +0,0 @@
#include <util/meta/traits.hpp>

View file

@ -1,8 +1,6 @@
#ifndef LLARP_TRAITS_HPP
#define LLARP_TRAITS_HPP
#include <absl/meta/type_traits.h>
#include <cstddef>
#include <type_traits>
#include <utility>
@ -12,9 +10,18 @@ namespace llarp
{
namespace traits
{
using absl::conjunction;
using absl::disjunction;
using absl::void_t;
#ifdef __cpp_lib_void_t
using std::void_t;
#else
/// C++17 void_t backport
template < typename... Ts >
struct void_t_impl
{
using type = void;
};
template < typename... Ts >
using void_t = typename void_t_impl< Ts... >::type;
#endif
/// Represents the empty type
struct Bottom

View file

@ -1 +0,0 @@
#include <util/meta/variant.hpp>

View file

@ -1,45 +0,0 @@
#ifndef LLARP_VARIANT_HPP
#define LLARP_VARIANT_HPP
#include <absl/types/variant.h>
namespace llarp
{
namespace util
{
template < typename... Ts >
struct _overloaded;
template < typename T, typename... Ts >
struct _overloaded< T, Ts... > : T, _overloaded< Ts... >
{
_overloaded(T&& t, Ts&&... ts)
: T(t), _overloaded< Ts... >(std::forward< Ts >(ts)...)
{
}
using T::operator();
using _overloaded< Ts... >::operator();
};
template < typename T >
struct _overloaded< T > : T
{
_overloaded(T&& t) : T(t)
{
}
using T::operator();
};
template < typename... Ts >
constexpr auto
overloaded(Ts&&... ts) -> _overloaded< Ts... >
{
return _overloaded< Ts... >(std::forward< Ts >(ts)...);
}
} // namespace util
} // namespace llarp
#endif

View file

@ -1,753 +0,0 @@
#include <util/metrics/core.hpp>
#include <iostream>
namespace llarp
{
namespace metrics
{
std::pair< Id, bool >
Registry::insert(string_view category, string_view name)
{
// avoid life time issues, putting strings in the stringmem set
string_view cStr = m_stringmem.emplace(category).first->c_str();
string_view nStr = m_stringmem.emplace(name).first->c_str();
NamedCategory namedCategory(cStr, nStr);
const auto it = m_metrics.find(namedCategory);
if(it != m_metrics.end())
{
return {Id(it->second.get()), false};
}
auto cIt = m_categories.find(cStr);
if(cIt == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
cIt = m_categories.emplace(cStr, ptr).first;
}
const auto mPtr =
std::make_shared< Description >(cIt->second.get(), nStr);
m_metrics.emplace(namedCategory, mPtr);
return {Id(mPtr.get()), true};
}
Id
Registry::add(string_view category, string_view name)
{
absl::WriterMutexLock l(&m_mutex);
auto result = insert(category, name);
return std::get< 1 >(result) ? std::get< 0 >(result) : Id();
}
Id
Registry::get(string_view category, string_view name)
{
Id result = findId(category, name);
if(result)
{
return result;
}
absl::WriterMutexLock l(&m_mutex);
return std::get< 0 >(insert(category, name));
}
const Category *
Registry::add(string_view category)
{
absl::WriterMutexLock l(&m_mutex);
string_view cStr = m_stringmem.emplace(category).first->c_str();
auto it = m_categories.find(cStr);
if(it == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
it = m_categories.emplace(cStr, ptr).first;
return it->second.get();
}
return nullptr;
}
const Category *
Registry::get(string_view category)
{
const Category *cPtr = findCategory(category);
if(cPtr)
{
return cPtr;
}
absl::WriterMutexLock l(&m_mutex);
string_view cStr = m_stringmem.emplace(category).first->c_str();
auto it = m_categories.find(cStr);
if(it == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
it = m_categories.emplace(cStr, ptr).first;
}
return it->second.get();
}
void
Registry::enable(const Category *category, bool value)
{
absl::WriterMutexLock l(&m_mutex);
const_cast< Category * >(category)->enabled(value);
}
void
Registry::enableAll(bool value)
{
absl::WriterMutexLock l(&m_mutex);
if(value == m_defaultEnabled)
{
return;
}
m_defaultEnabled = value;
std::for_each(m_categories.begin(), m_categories.end(),
[&](auto &x) { x.second->enabled(value); });
}
void
Registry::registerContainer(const Category *category,
CategoryContainer &container)
{
absl::WriterMutexLock l(&m_mutex);
if(container.m_category == nullptr)
{
const_cast< Category * >(category)->registerContainer(&container);
}
}
void
Registry::publicationType(const Id &id, Publication::Type type)
{
const_cast< Description * >(id.description())->type(type);
}
void
Registry::setFormat(const Id &id, const Format &format)
{
auto *description = const_cast< Description * >(id.description());
absl::WriterMutexLock l(&m_mutex);
auto fmtPtr = std::make_shared< Format >(format);
for(byte_t i = 0; i < Publication::MaxSize; ++i)
{
auto type = static_cast< Publication::Type >(i);
const FormatSpec *spec = format.specFor(type);
if(spec != nullptr)
{
string_view fmt = m_stringmem.emplace(spec->m_format).first->c_str();
fmtPtr->setSpec(type, FormatSpec(spec->m_scale, fmt));
}
}
description->format(fmtPtr);
}
const Category *
Registry::findCategory(string_view category) const
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_categories.find(category);
return it == m_categories.end() ? nullptr : it->second.get();
}
Id
Registry::findId(string_view category, string_view name) const
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_metrics.find(std::make_tuple(category, name));
return it == m_metrics.end() ? Id() : Id(it->second.get());
}
std::vector< const Category * >
Registry::getAll() const
{
absl::ReaderMutexLock l(&m_mutex);
std::vector< const Category * > result;
result.reserve(m_categories.size());
std::transform(m_categories.begin(), m_categories.end(),
std::back_inserter(result),
[](const auto &x) { return x.second.get(); });
return result;
}
struct PublisherHelper
{
using SampleCache = std::map< std::shared_ptr< Publisher >, Sample >;
static void
updateSampleCache(SampleCache &cache,
const std::shared_ptr< Publisher > &publisher,
const SampleGroup< double > &doubleGroup,
const SampleGroup< int > &intGroup,
const absl::Time &timeStamp)
{
auto it = cache.find(publisher);
if(it == cache.end())
{
Sample sample;
sample.sampleTime(timeStamp);
it = cache.emplace(publisher, sample).first;
}
it->second.pushGroup(doubleGroup);
it->second.pushGroup(intGroup);
}
struct CollectResult
{
Records records;
absl::Duration samplePeriod;
};
static CollectResult
collect(Manager &manager, const Category *category,
const absl::Duration &now, bool clear)
EXCLUSIVE_LOCKS_REQUIRED(manager.m_mutex)
{
// Collect records from the repo.
const Records result = clear
? Records(manager.m_doubleRepo.collectAndClear(category),
manager.m_intRepo.collectAndClear(category))
: Records(manager.m_doubleRepo.collect(category),
manager.m_intRepo.collect(category));
// Get the time since last reset, and clear if needed.
auto it = manager.m_resetTimes.find(category);
if(it == manager.m_resetTimes.end())
{
if(clear)
{
manager.m_resetTimes.emplace(category, now);
}
return {result, now - manager.m_createTime};
}
auto tmp = now - it->second;
if(clear)
{
it->second = now;
}
return {result, tmp};
}
template < typename Type >
using RecordBuffer = std::vector<
std::shared_ptr< std::vector< TaggedRecords< Type > > > >;
template < typename CategoryIterator >
static void
publish(Manager &manager, const CategoryIterator &categoriesBegin,
const CategoryIterator &categoriesEnd, bool clear)
{
if(categoriesBegin == categoriesEnd)
{
return;
}
RecordBuffer< double > doubleRecordBuffer;
RecordBuffer< int > intRecordBuffer;
SampleCache sampleCache;
absl::Time timeStamp = absl::Now();
absl::Duration now = absl::Now() - absl::UnixEpoch();
{
// 1.
absl::WriterMutexLock publishGuard(&manager.m_publishLock);
// 2.
absl::WriterMutexLock propertiesGuard(&manager.m_mutex);
// Build the 'sampleCache' by iterating over the categories and
// collecting records for those categories.
for(CategoryIterator catIt = categoriesBegin; catIt != categoriesEnd;
++catIt)
{
if(!(*catIt)->enabled())
{
continue;
}
// Collect the metrics.
auto result = collect(manager, *catIt, now, clear);
const auto &records = result.records;
// If there are no collected records then this category can be
// ignored.
if(records.doubleRecords.empty() && records.intRecords.empty())
{
continue;
}
if(result.samplePeriod == absl::Duration())
{
std::cerr << "Invalid elapsed time interval of 0 for "
"published metrics.";
result.samplePeriod += absl::Nanoseconds(1);
}
// Append the collected records to the buffer of records.
auto dRecords =
std::make_shared< DoubleRecords >(records.doubleRecords);
doubleRecordBuffer.push_back(dRecords);
SampleGroup< double > doubleGroup(
absl::Span< const TaggedRecords< double > >(*dRecords),
result.samplePeriod);
auto iRecords = std::make_shared< IntRecords >(records.intRecords);
intRecordBuffer.push_back(iRecords);
SampleGroup< int > intGroup(
absl::Span< const TaggedRecords< int > >(*iRecords),
result.samplePeriod);
std::for_each(manager.m_publishers.globalBegin(),
manager.m_publishers.globalEnd(),
[&](const auto &ptr) {
updateSampleCache(sampleCache, ptr, doubleGroup,
intGroup, timeStamp);
});
std::for_each(manager.m_publishers.lowerBound(*catIt),
manager.m_publishers.upperBound(*catIt),
[&](const auto &val) {
updateSampleCache(sampleCache, val.second,
doubleGroup, intGroup, timeStamp);
});
}
}
for(auto &entry : sampleCache)
{
Publisher *publisher = entry.first.get();
publisher->publish(entry.second);
}
}
};
Sample
Manager::collectSample(Records &records,
absl::Span< const Category * > categories,
bool clear)
{
absl::Time timeStamp = absl::Now();
absl::Duration now = timeStamp - absl::UnixEpoch();
Sample sample;
sample.sampleTime(timeStamp);
// Use a tuple to hold 'references' to the collected records
using SampleDescription = std::tuple< size_t, size_t, absl::Duration >;
std::vector< SampleDescription > dSamples;
std::vector< SampleDescription > iSamples;
dSamples.reserve(categories.size());
iSamples.reserve(categories.size());
// 1
absl::WriterMutexLock publishGuard(&m_publishLock);
// 2
absl::WriterMutexLock propertiesGuard(&m_mutex);
for(const Category *const category : categories)
{
if(!category->enabled())
{
continue;
}
size_t dBeginIndex = records.doubleRecords.size();
size_t iBeginIndex = records.intRecords.size();
// Collect the metrics.
auto collectRes = PublisherHelper::collect(*this, category, now, clear);
DoubleRecords catDRecords = collectRes.records.doubleRecords;
IntRecords catIRecords = collectRes.records.intRecords;
absl::Duration elapsedTime = collectRes.samplePeriod;
records.doubleRecords.insert(records.doubleRecords.end(),
catDRecords.begin(), catDRecords.end());
records.intRecords.insert(records.intRecords.end(), catIRecords.begin(),
catIRecords.end());
size_t dSize = records.doubleRecords.size() - dBeginIndex;
size_t iSize = records.intRecords.size() - iBeginIndex;
// If there are no collected records then this category can be ignored.
if(dSize != 0)
{
dSamples.emplace_back(dBeginIndex, dSize, elapsedTime);
}
if(iSize != 0)
{
iSamples.emplace_back(iBeginIndex, iSize, elapsedTime);
}
}
// Now that we have all the records, we can build our sample
for(const SampleDescription &s : dSamples)
{
sample.pushGroup(&records.doubleRecords[std::get< 0 >(s)],
std::get< 1 >(s), std::get< 2 >(s));
}
for(const SampleDescription &s : iSamples)
{
sample.pushGroup(&records.intRecords[std::get< 0 >(s)],
std::get< 1 >(s), std::get< 2 >(s));
}
return sample;
}
void
Manager::publish(absl::Span< const Category * > categories, bool clear)
{
PublisherHelper::publish(*this, categories.begin(), categories.end(),
clear);
}
void
Manager::publish(const std::set< const Category * > &categories, bool clear)
{
PublisherHelper::publish(*this, categories.begin(), categories.end(),
clear);
}
Manager *DefaultManager::m_manager = nullptr;
struct PublisherSchedulerData
{
util::Mutex m_mutex;
thread::Scheduler::Handle m_handle GUARDED_BY(m_mutex);
std::set< const Category * > m_categories GUARDED_BY(m_mutex);
bool m_default GUARDED_BY(m_mutex){false};
std::set< const Category * > m_nonDefaultCategories GUARDED_BY(m_mutex);
PublisherSchedulerData() : m_handle(thread::Scheduler::INVALID_HANDLE)
{
}
};
// Reverts a publisher scheduler back to its default state
class PublisherSchedulerGuard
{
PublisherScheduler *m_scheduler;
public:
PublisherSchedulerGuard(PublisherScheduler *scheduler)
: m_scheduler(scheduler)
{
}
~PublisherSchedulerGuard()
{
if(m_scheduler != nullptr)
{
for(auto &repeat : m_scheduler->m_repeaters)
{
if(repeat.second->m_handle != thread::Scheduler::INVALID_HANDLE)
{
m_scheduler->m_scheduler.cancelRepeat(repeat.second->m_handle);
}
}
m_scheduler->m_defaultInterval = absl::Duration();
m_scheduler->m_repeaters.clear();
m_scheduler->m_categories.clear();
}
}
void
release()
{
m_scheduler = nullptr;
}
};
void
PublisherScheduler::publish(
const std::shared_ptr< PublisherSchedulerData > &data) const
{
util::Lock l(&data->m_mutex);
if(data->m_default)
{
m_manager->publishAllExcluding(data->m_nonDefaultCategories);
}
else if(!data->m_categories.empty())
{
m_manager->publish(data->m_categories);
}
}
void
PublisherScheduler::cancel(Categories::iterator it)
{
assert(it != m_categories.end());
auto repeatIt = m_repeaters.find(it->second);
assert(repeatIt != m_repeaters.end());
const Category *category = it->first;
m_categories.erase(it);
auto data = repeatIt->second;
util::Lock l(&data->m_mutex);
assert(data->m_categories.find(category) != data->m_categories.end());
data->m_categories.erase(category);
if(!data->m_default)
{
if(data->m_categories.empty())
{
m_scheduler.cancelRepeat(data->m_handle);
m_repeaters.erase(repeatIt);
}
if(m_defaultInterval != absl::Duration())
{
auto defaultIntervalIt = m_repeaters.find(m_defaultInterval);
assert(defaultIntervalIt != m_repeaters.end());
auto &defaultRepeater = defaultIntervalIt->second;
util::Lock lock(&defaultRepeater->m_mutex);
defaultRepeater->m_nonDefaultCategories.erase(category);
}
}
}
bool
PublisherScheduler::cancelDefault()
{
if(m_defaultInterval == absl::Duration())
{
return false;
}
absl::Duration interval = m_defaultInterval;
m_defaultInterval = absl::Duration();
auto repeatIt = m_repeaters.find(interval);
assert(repeatIt != m_repeaters.end());
auto data = repeatIt->second;
util::Lock l(&data->m_mutex);
if(data->m_categories.empty())
{
assert(data->m_handle != thread::Scheduler::INVALID_HANDLE);
m_scheduler.cancelRepeat(data->m_handle);
m_repeaters.erase(repeatIt);
}
else
{
data->m_default = false;
data->m_nonDefaultCategories.clear();
}
return true;
}
void
PublisherScheduler::schedule(const Category *category,
absl::Duration interval)
{
assert(absl::Seconds(0) < interval);
util::Lock l(&m_mutex);
auto catIt = m_categories.find(category);
if(catIt != m_categories.end())
{
if(catIt->second == interval)
{
return;
}
cancel(catIt);
}
// Make a guard, so if something throws, the scheduler is reset to a
// somewhat "sane" state (no metrics).
PublisherSchedulerGuard guard(this);
m_categories.emplace(category, interval);
auto repeatIt = m_repeaters.find(interval);
std::shared_ptr< PublisherSchedulerData > data;
// Create a new 'ClockData' object if one does not exist for the
// 'interval', otherwise update the existing 'data'.
if(repeatIt == m_repeaters.end())
{
data = std::make_shared< PublisherSchedulerData >();
util::Lock lock(&data->m_mutex);
data->m_categories.insert(category);
m_repeaters.emplace(interval, data);
data->m_handle = m_scheduler.scheduleRepeat(
interval, std::bind(&PublisherScheduler::publish, this, data));
}
else
{
data = repeatIt->second;
util::Lock lock(&data->m_mutex);
data->m_categories.insert(category);
}
// If this isn't being added to the default schedule, then add to the set
// of non-default categories in the default schedule.
util::Lock dataLock(&data->m_mutex);
if(!data->m_default && m_defaultInterval != absl::Duration())
{
auto defaultIntervalIt = m_repeaters.find(m_defaultInterval);
assert(defaultIntervalIt != m_repeaters.end());
auto &defaultInterval = defaultIntervalIt->second;
util::Lock lock(&defaultInterval->m_mutex);
defaultInterval->m_nonDefaultCategories.insert(category);
}
guard.release();
}
void
PublisherScheduler::setDefault(absl::Duration interval)
{
assert(absl::Seconds(0) < interval);
util::Lock l(&m_mutex);
// If its already this interval, return early.
if(interval == m_defaultInterval)
{
return;
}
cancelDefault();
m_defaultInterval = interval;
// Make a guard, so if something throws, the scheduler is reset to a
// somewhat "sane" state (no metrics).
PublisherSchedulerGuard guard(this);
std::shared_ptr< PublisherSchedulerData > data;
auto repeatIt = m_repeaters.find(interval);
if(repeatIt == m_repeaters.end())
{
data = std::make_shared< PublisherSchedulerData >();
m_repeaters.emplace(interval, data);
}
else
{
data = repeatIt->second;
}
util::Lock lock(&data->m_mutex);
data->m_default = true;
auto cIt = m_categories.begin();
for(; cIt != m_categories.end(); ++cIt)
{
if(cIt->second != interval)
{
data->m_nonDefaultCategories.insert(cIt->first);
}
}
if(data->m_handle == thread::Scheduler::INVALID_HANDLE)
{
data->m_handle = m_scheduler.scheduleRepeat(
interval, std::bind(&PublisherScheduler::publish, this, data));
}
guard.release();
}
bool
PublisherScheduler::cancel(const Category *category)
{
util::Lock l(&m_mutex);
auto it = m_categories.find(category);
if(it == m_categories.end())
{
// This category has no specific schedule.
return false;
}
cancel(it);
return true;
}
bool
PublisherScheduler::clearDefault()
{
util::Lock l(&m_mutex);
return cancelDefault();
}
void
PublisherScheduler::cancelAll()
{
util::Lock l(&m_mutex);
for(auto &repeat : m_repeaters)
{
util::Lock dataLock(&repeat.second->m_mutex);
m_scheduler.cancelRepeat(repeat.second->m_handle, true);
}
m_defaultInterval = absl::Duration();
m_repeaters.clear();
m_categories.clear();
}
nonstd::optional< absl::Duration >
PublisherScheduler::find(const Category *category) const
{
util::Lock l(&m_mutex);
auto it = m_categories.find(category);
if(it == m_categories.end())
{
return {};
}
return it->second;
}
nonstd::optional< absl::Duration >
PublisherScheduler::getDefault() const
{
util::Lock l(&m_mutex);
if(m_defaultInterval == absl::Duration())
{
return {};
}
return m_defaultInterval;
}
std::vector< std::pair< const Category *, absl::Duration > >
PublisherScheduler::getAll() const
{
util::Lock l(&m_mutex);
std::vector< std::pair< const Category *, absl::Duration > > result;
result.reserve(m_categories.size());
std::copy(m_categories.begin(), m_categories.end(),
std::back_inserter(result));
return result;
}
} // namespace metrics
} // namespace llarp

File diff suppressed because it is too large Load diff

View file

@ -1,178 +0,0 @@
#include <util/metrics/json_publisher.hpp>
#include <fstream>
#include <iomanip>
#include <iostream>
namespace llarp
{
namespace metrics
{
namespace
{
nlohmann::json
tagsToJson(const Tags &tags)
{
nlohmann::json result;
std::for_each(tags.begin(), tags.end(), [&](const auto &tag) {
absl::visit([&](const auto &t) { result[tag.first] = t; },
tag.second);
});
return result;
}
template < typename Value >
nlohmann::json
formatValue(const Record< Value > &record, const Tags &tags,
double elapsedTime, Publication::Type publicationType)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
return {{"tags", tagsToJson(tags)}, {"total", record.total()}};
}
break;
case Publication::Type::Count:
{
return {{"tags", tagsToJson(tags)}, {"count", record.count()}};
}
break;
case Publication::Type::Min:
{
return {{"tags", tagsToJson(tags)}, {"min", record.min()}};
}
break;
case Publication::Type::Max:
{
return {{"tags", tagsToJson(tags)}, {"max", record.max()}};
}
break;
case Publication::Type::Avg:
{
return {{"tags", tagsToJson(tags)},
{"avg", record.total() / record.count()}};
}
break;
case Publication::Type::Rate:
{
return {{"tags", tagsToJson(tags)},
{"rate", record.total() / elapsedTime}};
}
break;
case Publication::Type::RateCount:
{
return {{"tags", tagsToJson(tags)},
{"rateCount", record.count() / elapsedTime}};
}
break;
}
return {};
}
template < typename Value >
nlohmann::json
recordToJson(const TaggedRecords< Value > &taggedRecord,
double elapsedTime)
{
nlohmann::json result;
result["id"] = taggedRecord.id.toString();
auto publicationType = taggedRecord.id.description()->type();
for(const auto &rec : taggedRecord.data)
{
const auto &record = rec.second;
if(publicationType != Publication::Type::Unspecified)
{
result["publicationType"] = Publication::repr(publicationType);
result["metrics"].push_back(
formatValue(record, rec.first, elapsedTime, publicationType));
}
else
{
nlohmann::json tmp;
tmp["tags"] = tagsToJson(rec.first);
tmp["count"] = record.count();
tmp["total"] = record.total();
if(Record< Value >::DEFAULT_MIN() != record.min())
{
tmp["min"] = record.min();
}
if(Record< Value >::DEFAULT_MAX() == record.max())
{
tmp["max"] = record.max();
}
result["metrics"].push_back(tmp);
}
}
return result;
}
} // namespace
void
JsonPublisher::publish(const Sample &values)
{
if(values.recordCount() == 0)
{
// nothing to publish
return;
}
nlohmann::json result;
result["sampleTime"] = absl::UnparseFlag(values.sampleTime());
result["recordCount"] = values.recordCount();
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
if(gIt == prev || samplePeriod(*gIt) != samplePeriod(*prev))
{
result["elapsedTime"] = elapsedTime;
}
absl::visit(
[&](const auto &x) -> void {
for(const auto &record : x)
{
result["record"].emplace_back(
recordToJson(record, elapsedTime));
}
},
*gIt);
prev = gIt;
}
m_publish(result);
}
void
JsonPublisher::directoryPublisher(const nlohmann::json &result,
const fs::path &path)
{
std::ofstream fstream(path.string(), std::ios_base::app);
if(!fstream)
{
std::cerr << "Skipping metrics publish, " << path << " is not a file\n";
return;
}
fstream << std::setw(0) << result << '\n';
fstream.close();
}
} // namespace metrics
} // namespace llarp

View file

@ -1,40 +0,0 @@
#ifndef LLARP_METRICS_JSON_PUBLISHER_HPP
#define LLARP_METRICS_JSON_PUBLISHER_HPP
#include <util/fs.hpp>
#include <util/metrics/core.hpp>
#include <nlohmann/json.hpp>
#include <functional>
#include <iosfwd>
#include <utility>
namespace llarp
{
namespace metrics
{
class JsonPublisher final : public Publisher
{
public:
using PublishFunction = std::function< void(const nlohmann::json&) >;
private:
PublishFunction m_publish;
public:
JsonPublisher(PublishFunction publish) : m_publish(std::move(publish))
{
}
~JsonPublisher() override = default;
void
publish(const Sample& values) override;
static void
directoryPublisher(const nlohmann::json& result, const fs::path& path);
};
} // namespace metrics
} // namespace llarp
#endif

View file

@ -1 +0,0 @@
#include <util/metrics/metrics.hpp>

View file

@ -1,80 +0,0 @@
#ifndef LLARP_METRICS_HPP
#define LLARP_METRICS_HPP
#include <util/metrics/types.hpp>
#include <util/metrics/core.hpp>
#include <util/string_view.hpp>
namespace llarp
{
namespace metrics
{
struct MetricsHelper
{
static void
initContainer(CategoryContainer& container, const char* category)
{
Manager* manager = DefaultManager::instance();
Registry& registry = manager->registry();
registry.registerContainer(registry.get(category), container);
}
static void
setType(const Id& id, Publication::Type type)
{
Manager* manager = DefaultManager::instance();
return manager->registry().publicationType(id, type);
}
};
template < typename... TagVals >
void
integerTick(string_view category, string_view metric, int val,
TagVals&&... tags)
{
if(DefaultManager::instance())
{
CollectorRepo< int >& repository =
DefaultManager::instance()->intCollectorRepo();
IntCollector* collector = repository.defaultCollector(category, metric);
if(collector->id().category()->enabled())
{
collector->tick(val, tags...);
}
}
}
} // namespace metrics
} // namespace llarp
// Some MSVC flags mess with __LINE__, but __COUNTER__ is better anyway
#ifdef _MSC_VER
#define METRICS_UNIQ_NUMBER __COUNTER__
#else
#define METRICS_UNIQ_NUMBER __LINE__
#endif
// Use a level of indirection to force the preprocessor to expand args first.
#define METRICS_NAME_CAT_IMP(X, Y) X##Y
#define METRICS_NAME_CAT(X, Y) METRICS_NAME_CAT_IMP(X, Y)
#define METRICS_UNIQUE_NAME(X) METRICS_NAME_CAT(X, METRICS_UNIQ_NUMBER)
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_UPDATE(CAT, METRIC, ...) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo< double >& repository = \
DefaultManager::instance()->doubleCollectorRepo(); \
DoubleCollector* collector = \
repository.defaultCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick(__VA_ARGS__); \
} \
} \
} while(false)
#endif

View file

@ -1,430 +0,0 @@
#include <util/metrics/metrictank_publisher.hpp>
#include <util/logging/logger.hpp>
#include <util/meta/variant.hpp>
#include <cstdio>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#ifndef _WIN32
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
// bzero and friends graduated from /usr/ucb*
// not too long ago
#include <strings.h>
#else
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#include <lmcons.h>
#endif
namespace llarp
{
namespace metrics
{
namespace
{
nonstd::optional< std::string >
makeStr(double d)
{
if(std::isnan(d) || std::isinf(d))
{
return {};
}
return std::to_string(d);
}
nonstd::optional< std::string >
makeStr(int i)
{
if(i == std::numeric_limits< int >::min()
|| i == std::numeric_limits< int >::max())
{
return {};
}
return std::to_string(i);
}
template < typename Value >
nonstd::optional< std::string >
formatValue(const Record< Value > &record, double elapsedTime,
Publication::Type publicationType)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
return makeStr(record.total());
}
break;
case Publication::Type::Count:
{
return std::to_string(record.count());
}
break;
case Publication::Type::Min:
{
return makeStr(record.min());
}
break;
case Publication::Type::Max:
{
return makeStr(record.max());
}
break;
case Publication::Type::Avg:
{
return makeStr(static_cast< double >(record.total())
/ static_cast< double >(record.count()));
}
break;
case Publication::Type::Rate:
{
return makeStr(record.total() / elapsedTime);
}
break;
case Publication::Type::RateCount:
{
return makeStr(record.count() / elapsedTime);
}
break;
}
assert(false && "Invalid publication type");
return {};
}
std::string
makeTagStr(const Tags &tags)
{
std::string tagStr;
auto overloaded = util::overloaded(
[](const std::string &str) { return str; },
[](double d) { return std::to_string(d); },
[](const std::int64_t i) { return std::to_string(i); });
for(const auto &tag : tags)
{
absl::StrAppend(&tagStr, ";", tag.first, "=",
absl::visit(overloaded, tag.second));
}
if(!tags.empty())
{
absl::StrAppend(&tagStr, ";");
}
return tagStr;
}
std::string
addName(string_view id, string_view name, const Tags &tags,
string_view suffix)
{
return absl::StrCat(id, ".", name, makeTagStr(tags), suffix);
}
constexpr bool
isValid(int val)
{
return val != std::numeric_limits< int >::min()
&& val != std::numeric_limits< int >::max();
}
constexpr bool
isValid(double val)
{
return Record< double >::DEFAULT_MIN() != val
&& Record< double >::DEFAULT_MAX() != val && !std::isnan(val)
&& !std::isinf(val);
}
template < typename Value >
std::vector< MetricTankPublisherInterface::PublishData >
recordToData(const TaggedRecords< Value > &taggedRecords, absl::Time time,
double elapsedTime, string_view suffix)
{
std::vector< MetricTankPublisherInterface::PublishData > result;
std::string id = taggedRecords.id.toString();
auto publicationType = taggedRecords.id.description()->type();
for(const auto &record : taggedRecords.data)
{
const auto &tags = record.first;
const auto &rec = record.second;
if(publicationType != Publication::Type::Unspecified)
{
auto val = formatValue(rec, elapsedTime, publicationType);
if(val)
{
result.emplace_back(
addName(id, Publication::repr(publicationType), tags, suffix),
val.value(), time);
}
}
else
{
result.emplace_back(addName(id, "count", tags, suffix),
std::to_string(rec.count()), time);
result.emplace_back(addName(id, "total", tags, suffix),
std::to_string(rec.total()), time);
if(isValid(rec.min()))
{
result.emplace_back(addName(id, "min", tags, suffix),
std::to_string(rec.min()), time);
}
if(isValid(rec.max()))
{
result.emplace_back(addName(id, "max", tags, suffix),
std::to_string(rec.max()), time);
}
}
}
return result;
}
#ifndef _WIN32
void
publishData(const std::vector< std::string > &toSend,
const std::string &host, short port)
{
struct addrinfo hints;
struct addrinfo *addrs;
bzero(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
const std::string portAsStr = std::to_string(port);
if(getaddrinfo(host.c_str(), portAsStr.c_str(), &hints, &addrs) != 0)
{
LogError("Failed to get address info");
return;
}
int sock =
::socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol);
if(sock < 0)
{
LogError("Failed to open socket");
freeaddrinfo(addrs);
return;
}
if(connect(sock, addrs->ai_addr, addrs->ai_addrlen) < 0)
{
LogError("Failed to connect to metrictank");
close(sock);
freeaddrinfo(addrs);
return;
}
freeaddrinfo(addrs);
for(const std::string &val : toSend)
{
ssize_t sentLen = 0;
do
{
sentLen =
::send(sock, val.c_str() + sentLen, val.size() - sentLen, 0);
if(sentLen == -1)
{
LogError("Error ", strerror(errno));
}
} while((0 <= sentLen)
&& (static_cast< size_t >(sentLen) < val.size()));
}
LogInfo("Sent ", toSend.size(), " metrics to metrictank");
shutdown(sock, SHUT_RDWR);
close(sock);
}
#else
void
publishData(const std::vector< std::string > &toSend,
const std::string &host, short port)
{
struct addrinfo *addrs = NULL, hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
const std::string portAsStr = std::to_string(port);
if(getaddrinfo(host.c_str(), portAsStr.c_str(), &hints, &addrs) != 0)
{
LogError("Failed to get address info");
return;
}
SOCKET sock =
::socket(addrs->ai_family, addrs->ai_socktype, addrs->ai_protocol);
if(sock == INVALID_SOCKET)
{
LogError("Failed to open socket");
freeaddrinfo(addrs);
return;
}
if(connect(sock, addrs->ai_addr, addrs->ai_addrlen) == SOCKET_ERROR)
{
LogError("Failed to connect to metrictank");
closesocket(sock);
freeaddrinfo(addrs);
return;
}
freeaddrinfo(addrs);
for(const std::string &val : toSend)
{
int sentLen = 0;
do
{
sentLen =
::send(sock, val.c_str() + sentLen, val.size() - sentLen, 0);
if(sentLen == SOCKET_ERROR)
{
LogError("Error ", strerror(errno));
}
} while((0 <= sentLen)
&& (static_cast< size_t >(sentLen) < val.size()));
}
shutdown(sock, SD_SEND);
closesocket(sock);
}
#endif
MetricTankPublisherInterface::Tags
updateTags(MetricTankPublisherInterface::Tags tags)
{
if(tags.count("system") == 0)
{
#if defined(_WIN32) || defined(_WIN64) || defined(__NT__)
tags["system"] = "windows";
#elif defined(__APPLE__)
tags["system"] = "macos";
#elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
tags["system"] = "bsd";
#elif defined(__sun)
tags["system"] = "solaris";
#elif defined(__linux__)
tags["system"] = "linux";
#else
tags["system"] = "unknown";
#endif
}
return tags;
}
} // namespace
std::string
MetricTankPublisherInterface::makeSuffix(const Tags &tags)
{
return absl::StrJoin(updateTags(tags), ";", absl::PairFormatter("="));
}
void
MetricTankPublisherInterface::publish(const Sample &values)
{
if(values.recordCount() == 0)
{
// nothing to publish
return;
}
absl::Time sampleTime = values.sampleTime();
std::vector< PublishData > result;
result.reserve(values.recordCount());
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
absl::visit(
[&](const auto &d) {
for(const auto &record : d)
{
auto partial =
recordToData(record, sampleTime, elapsedTime, m_suffix);
result.insert(result.end(), partial.begin(), partial.end());
}
},
*gIt);
prev = gIt;
}
publish(result);
}
void
MetricTankPublisher::publish(const std::vector< PublishData > &data)
{
if(m_queue.tryPushBack(data) == thread::QueueReturn::QueueFull)
{
LogWarn("Dropping metrictank logs!");
}
}
void
MetricTankPublisher::work()
{
while(true)
{
auto data = m_queue.popFront(); // block until we get something
// Finish
if(absl::holds_alternative< StopStruct >(data))
{
return;
}
assert(absl::holds_alternative< std::vector< PublishData > >(data));
auto vec = absl::get< std::vector< PublishData > >(data);
std::vector< std::string > toSend;
toSend.reserve(vec.size());
std::transform(vec.begin(), vec.end(), std::back_inserter(toSend),
[](const PublishData &d) -> std::string {
return absl::StrCat(
std::get< 0 >(d), " ", std::get< 1 >(d), " ",
absl::ToUnixSeconds(std::get< 2 >(d)), "\n");
});
publishData(toSend, m_host, m_port);
}
}
} // namespace metrics
} // namespace llarp

View file

@ -1,90 +0,0 @@
#ifndef LLARP_METRICS_METRICTANK_PUBLISHER_HPP
#define LLARP_METRICS_METRICTANK_PUBLISHER_HPP
#include <util/metrics/core.hpp>
#include <util/thread/queue.hpp>
#include <absl/types/variant.h>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
namespace llarp
{
namespace metrics
{
class MetricTankPublisherInterface : public Publisher
{
public:
// Format for metrictank is <metric path, value, seconds since epoch>
// Metric path = metrics.namespaces.metric;key=value;key1=value2
using PublishData = std::tuple< std::string, std::string, absl::Time >;
using Tags = std::map< std::string, std::string >;
private:
const std::string m_suffix; // tags to send to metric tank
public:
MetricTankPublisherInterface(const Tags& tags)
: m_suffix(makeSuffix(tags))
{
}
~MetricTankPublisherInterface() override = default;
static std::string
makeSuffix(const Tags& tags);
void
publish(const Sample& values) override;
virtual void
publish(const std::vector< PublishData >& data) = 0;
};
class MetricTankPublisher final : public MetricTankPublisherInterface
{
private:
const std::string m_host;
const short m_port;
struct StopStruct
{
};
using Queue = thread::Queue<
absl::variant< std::vector< PublishData >, StopStruct > >;
Queue m_queue; // queue of things to publish
std::thread m_worker; // worker thread
void
work();
public:
MetricTankPublisher(const Tags& tags, std::string host, short port)
: MetricTankPublisherInterface(tags)
, m_host(std::move(host))
, m_port(port)
, m_queue(100)
, m_worker(&MetricTankPublisher::work, this)
{
}
~MetricTankPublisher() override
{
// Push back a signal value onto the queue
m_queue.pushBack(StopStruct());
}
void
publish(const std::vector< PublishData >& data) override;
};
} // namespace metrics
} // namespace llarp
#endif

View file

@ -1,196 +0,0 @@
#include <util/metrics/stream_publisher.hpp>
#include <fstream>
#include <iostream>
#include <iomanip>
namespace llarp
{
namespace metrics
{
namespace
{
template < typename Value >
void
formatValue(std::ostream &stream, Value value,
const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, static_cast< double >(value), *formatSpec);
}
else
{
stream << value;
}
}
template < typename Value >
void
formatValue(std::ostream &stream, const Record< Value > &record,
double elapsedTime, Publication::Type publicationType,
const FormatSpec *formatSpec)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
formatValue(stream, record.total(), formatSpec);
}
break;
case Publication::Type::Count:
{
formatValue(stream, record.count(), formatSpec);
}
break;
case Publication::Type::Min:
{
formatValue(stream, record.min(), formatSpec);
}
break;
case Publication::Type::Max:
{
formatValue(stream, record.max(), formatSpec);
}
break;
case Publication::Type::Avg:
{
formatValue(stream, record.total() / record.count(), formatSpec);
}
break;
case Publication::Type::Rate:
{
formatValue(stream, record.total() / elapsedTime, formatSpec);
}
break;
case Publication::Type::RateCount:
{
formatValue(stream, record.count() / elapsedTime, formatSpec);
}
break;
}
}
template < typename Value >
void
publishRecord(std::ostream &stream,
const TaggedRecords< Value > &taggedRecords,
double elapsedTime)
{
auto publicationType = taggedRecords.id.description()->type();
std::shared_ptr< const Format > format =
taggedRecords.id.description()->format();
if(taggedRecords.data.empty())
{
return;
}
stream << "\t\t" << taggedRecords.id << " [";
for(const auto &rec : taggedRecords.data)
{
stream << "\n\t\t\t";
const auto &tags = rec.first;
const auto &record = rec.second;
{
Printer printer(stream, -1, -1);
printer.printValue(tags);
}
stream << " ";
if(publicationType != Publication::Type::Unspecified)
{
stream << Publication::repr(publicationType) << " = ";
const FormatSpec *formatSpec =
format ? format->specFor(publicationType) : nullptr;
formatValue(stream, record, elapsedTime, publicationType,
formatSpec);
}
else
{
const FormatSpec *countSpec = nullptr;
const FormatSpec *totalSpec = nullptr;
const FormatSpec *minSpec = nullptr;
const FormatSpec *maxSpec = nullptr;
if(format)
{
countSpec = format->specFor(Publication::Type::Count);
totalSpec = format->specFor(Publication::Type::Total);
minSpec = format->specFor(Publication::Type::Min);
maxSpec = format->specFor(Publication::Type::Max);
}
stream << "count = ";
formatValue(stream, record.count(), countSpec);
stream << ", total = ";
formatValue(stream, record.total(), totalSpec);
if(Record< Value >::DEFAULT_MIN() == record.min())
{
stream << ", min = undefined";
}
else
{
stream << ", min = ";
formatValue(stream, record.min(), minSpec);
}
if(Record< Value >::DEFAULT_MAX() == record.max())
{
stream << ", max = undefined";
}
else
{
stream << ", max = ";
formatValue(stream, record.max(), maxSpec);
}
}
}
stream << "\n\t\t]\n";
}
} // namespace
void
StreamPublisher::publish(const Sample &values)
{
if(values.recordCount() == 0)
{
// nothing to publish
return;
}
m_stream << values.sampleTime() << " " << values.recordCount()
<< " Records\n";
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
if(gIt == prev || samplePeriod(*gIt) != samplePeriod(*prev))
{
m_stream << "\tElapsed Time: " << elapsedTime << "s\n";
}
absl::visit(
[&](const auto &x) {
for(const auto &record : x)
{
publishRecord(m_stream, record, elapsedTime);
}
},
*gIt);
prev = gIt;
}
}
} // namespace metrics
} // namespace llarp

View file

@ -1,30 +0,0 @@
#ifndef LLARP_METRICS_STREAM_PUBLISHER_HPP
#define LLARP_METRICS_STREAM_PUBLISHER_HPP
#include <util/metrics/core.hpp>
#include <iosfwd>
namespace llarp
{
namespace metrics
{
class StreamPublisher final : public Publisher
{
std::ostream& m_stream;
public:
StreamPublisher(std::ostream& stream) : m_stream(stream)
{
}
~StreamPublisher() override = default;
void
publish(const Sample& values) override;
};
} // namespace metrics
} // namespace llarp
#endif

View file

@ -1,145 +0,0 @@
#include <util/metrics/types.hpp>
#include <util/printer.hpp>
#include <absl/strings/str_join.h>
namespace llarp
{
namespace metrics
{
std::ostream &
FormatSpec::format(std::ostream &stream, double data,
const FormatSpec &format)
{
static constexpr size_t INIT_SIZE = 32;
char buf[INIT_SIZE] = {0};
int rc = snprintf(buf, INIT_SIZE, format.m_format.data(),
data * format.m_scale);
if(rc < 0)
{
stream << "Bad format " << format.m_format << " applied to " << data;
return stream;
}
if(static_cast< size_t >(rc) < INIT_SIZE)
{
stream << buf;
return stream;
}
std::vector< char > vec(rc + 1);
rc = snprintf(vec.data(), vec.size(), format.m_format.data(),
data * format.m_scale);
if(static_cast< size_t >(rc) > vec.size())
{
stream << "Bad format " << format.m_format << " applied to " << data;
return stream;
}
stream << vec.data();
return stream;
}
string_view
Publication::repr(Type val)
{
switch(val)
{
case Type::Unspecified:
return "Unspecified";
case Type::Total:
return "Total";
case Type::Count:
return "Count";
case Type::Min:
return "Min";
case Type::Max:
return "Max";
case Type::Avg:
return "Avg";
case Type::Rate:
return "Rate";
case Type::RateCount:
return "RateCount";
default:
return "???";
}
}
std::ostream &
Publication::print(std::ostream &stream, Type val)
{
stream << repr(val);
return stream;
}
Category::~Category()
{
while(m_container)
{
auto next = m_container->m_nextCategory;
m_container->clear();
m_container = next;
}
}
void
Category::enabled(bool val)
{
// sync point
if(m_enabled != val)
{
auto cont = m_container;
while(cont)
{
cont->m_enabled = val;
cont = cont->m_nextCategory;
}
m_enabled = val;
}
}
void
Category::registerContainer(CategoryContainer *container)
{
container->m_enabled = m_enabled;
container->m_category = this;
container->m_nextCategory = m_container;
m_container = container;
}
std::ostream &
Category::print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("name", m_name);
printer.printAttribute("enabled",
m_enabled.load(std::memory_order_relaxed));
return stream;
}
std::string
Description::toString() const
{
util::Lock l(&m_mutex);
return absl::StrCat(m_category->name(), ".", m_name);
}
std::ostream &
Description::print(std::ostream &stream) const
{
util::Lock l(&m_mutex);
stream << m_category->name() << '.' << m_name;
return stream;
}
} // namespace metrics
} // namespace llarp

View file

@ -1,675 +0,0 @@
#ifndef LLARP_METRICS_TYPES_HPP
#define LLARP_METRICS_TYPES_HPP
#include <util/printer.hpp>
#include <util/string_view.hpp>
#include <util/thread/threading.hpp>
#include <util/types.hpp>
#include <util/meta/variant.hpp>
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/hash/hash.h>
#include <nonstd/optional.hpp>
#include <absl/types/span.h>
#include <absl/types/variant.h>
#include <cstring>
#include <iosfwd>
#include <memory>
#include <set>
#include <vector>
namespace llarp
{
namespace metrics
{
struct Publication
{
enum class Type : byte_t
{
Unspecified = 0, // no associated metric type
Total, // sum of seen values in the measurement period
Count, // count of seen events
Min, // Minimum value
Max, // Max value
Avg, // total / count
Rate, // total per second
RateCount // count per second
};
enum
{
MaxSize = static_cast< byte_t >(Type::RateCount) + 1
};
static string_view
repr(Type val);
static std::ostream &
print(std::ostream &stream, Type val);
};
struct FormatSpec
{
float m_scale{1.0};
string_view m_format;
static constexpr char DEFAULT_FORMAT[] = "%f";
constexpr FormatSpec() : m_format(DEFAULT_FORMAT)
{
}
constexpr FormatSpec(float scale, string_view format)
: m_scale(scale), m_format(format)
{
}
static std::ostream &
format(std::ostream &stream, double data, const FormatSpec &spec);
};
inline bool
operator==(const FormatSpec &lhs, const FormatSpec &rhs)
{
return std::make_tuple(lhs.m_scale, lhs.m_format)
== std::make_tuple(rhs.m_scale, rhs.m_format);
}
struct Format
{
using Spec = nonstd::optional< FormatSpec >;
std::array< Spec, Publication::MaxSize > m_specs;
constexpr Format() : m_specs()
{
}
void
setSpec(Publication::Type pub, const FormatSpec &spec)
{
m_specs[static_cast< size_t >(pub)].emplace(spec);
}
void
clear()
{
for(auto &s : m_specs)
s.reset();
}
constexpr const FormatSpec *
specFor(Publication::Type val) const
{
const auto &spec = m_specs[static_cast< size_t >(val)];
return spec ? &spec.value() : nullptr;
}
};
inline bool
operator==(const Format &lhs, const Format &rhs)
{
return lhs.m_specs == rhs.m_specs;
}
struct CategoryContainer;
/// Represents a category of grouped metrics
class Category
{
string_view m_name;
std::atomic_bool m_enabled;
CategoryContainer *m_container;
public:
Category(string_view name, bool enabled = true)
: m_name(name), m_enabled(enabled), m_container(nullptr)
{
}
~Category();
void
enabled(bool flag);
void
registerContainer(CategoryContainer *container);
const std::atomic_bool &
enabledRaw() const
{
return m_enabled;
}
string_view
name() const
{
return m_name;
}
bool
enabled() const
{
return m_enabled;
}
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Category &c)
{
return c.print(stream, -1, -1);
}
struct CategoryContainer
{
bool m_enabled;
const Category *m_category;
CategoryContainer *m_nextCategory;
constexpr void
clear()
{
m_enabled = false;
m_category = nullptr;
m_nextCategory = nullptr;
}
};
class Description
{
mutable util::Mutex m_mutex;
const Category *m_category GUARDED_BY(m_mutex);
string_view m_name GUARDED_BY(m_mutex);
Publication::Type m_type GUARDED_BY(m_mutex);
std::shared_ptr< Format > m_format GUARDED_BY(m_mutex);
Description(const Description &) = delete;
Description &
operator=(const Description &) = delete;
public:
Description(const Category *category, string_view name)
: m_category(category)
, m_name(name)
, m_type(Publication::Type::Unspecified)
{
}
void
category(const Category *c) LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
m_category = c;
}
const Category *
category() const LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
return m_category;
}
void
name(string_view n) LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
m_name = n;
}
string_view
name() const LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
return m_name;
}
void
type(Publication::Type t) LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
m_type = t;
}
Publication::Type
type() const LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
return m_type;
}
void
format(const std::shared_ptr< Format > &f) LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
m_format = f;
}
std::shared_ptr< Format >
format() const LOCKS_EXCLUDED(m_mutex)
{
util::Lock l(&m_mutex);
return m_format;
}
std::string
toString() const;
std::ostream &
print(std::ostream &stream) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Description &d)
{
return d.print(stream);
}
/// A metric id is what we will actually deal with in terms of metrics, in
/// order to make things like static initialisation cleaner.
class Id
{
const Description *m_description{nullptr};
public:
constexpr Id() = default;
constexpr Id(const Description *description) : m_description(description)
{
}
constexpr const Description *&
description()
{
return m_description;
}
constexpr const Description *const &
description() const
{
return m_description;
}
bool
valid() const noexcept
{
return m_description != nullptr;
}
explicit operator bool() const noexcept
{
return valid();
}
const Category *
category() const
{
assert(valid());
return m_description->category();
}
string_view
categoryName() const
{
assert(valid());
return m_description->category()->name();
}
string_view
metricName() const
{
assert(valid());
return m_description->name();
}
std::string
toString() const
{
if(m_description)
{
return m_description->toString();
;
}
return "INVALID_METRIC";
}
std::ostream &
print(std::ostream &stream, int, int) const
{
if(m_description)
{
stream << *m_description;
}
else
{
stream << "INVALID_METRIC";
}
return stream;
}
};
inline bool
operator==(const Id &lhs, const Id &rhs)
{
return lhs.description() == rhs.description();
}
inline bool
operator<(const Id &lhs, const Id &rhs)
{
return lhs.description() < rhs.description();
}
inline std::ostream &
operator<<(std::ostream &stream, const Id &id)
{
return id.print(stream, -1, -1);
}
// clang-format off
// Forwarding class to specialise for metric types
template<typename Type>
struct RecordMax {
};
template<>
struct RecordMax<double> {
static constexpr double min() { return std::numeric_limits< double >::infinity(); }
static constexpr double max() { return -std::numeric_limits< double >::infinity(); }
};
template<>
struct RecordMax<int> {
static constexpr int min() { return std::numeric_limits< int >::max(); }
static constexpr int max() { return std::numeric_limits< int >::min(); }
};
// clang-format on
template < typename Type >
class Record
{
size_t m_count{0};
Type m_total;
Type m_min;
Type m_max;
public:
// clang-format off
static constexpr Type DEFAULT_MIN() { return RecordMax<Type>::min(); };
static constexpr Type DEFAULT_MAX() { return RecordMax<Type>::max(); };
// clang-format on
Record() : m_total(0.0), m_min(DEFAULT_MIN()), m_max(DEFAULT_MAX())
{
}
Record(size_t count, double total, double min, double max)
: m_count(count), m_total(total), m_min(min), m_max(max)
{
}
// clang-format off
size_t count() const { return m_count; }
size_t& count() { return m_count; }
Type total() const { return m_total; }
Type& total() { return m_total; }
Type min() const { return m_min; }
Type& min() { return m_min; }
Type max() const { return m_max; }
Type& max() { return m_max; }
// clang-format on
void
clear()
{
m_count = 0;
m_total = 0;
m_min = DEFAULT_MIN();
m_max = DEFAULT_MAX();
}
std::ostream &
print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("count", m_count);
printer.printAttribute("total", m_total);
printer.printAttribute("min", m_min);
printer.printAttribute("max", m_max);
return stream;
}
};
template < typename Type >
inline std::ostream &
operator<<(std::ostream &stream, const Record< Type > &rec)
{
return rec.print(stream, -1, -1);
}
template < typename Type >
inline bool
operator==(const Record< Type > &lhs, const Record< Type > &rhs)
{
return std::make_tuple(lhs.count(), lhs.total(), lhs.min(), lhs.max())
== std::make_tuple(rhs.count(), rhs.total(), rhs.min(), rhs.max());
}
template < typename Type >
inline bool
operator!=(const Record< Type > &lhs, const Record< Type > &rhs)
{
return !(lhs == rhs);
}
using Tag = std::string;
using TagValue = absl::variant< std::string, double, std::int64_t >;
using Tags = std::set< std::pair< Tag, TagValue > >;
template < typename Type >
using TaggedRecordsData = absl::flat_hash_map< Tags, Record< Type > >;
template < typename Type >
struct TaggedRecords
{
Id id;
TaggedRecordsData< Type > data;
explicit TaggedRecords(const Id &_id) : id(_id)
{
}
TaggedRecords(const Id &_id, const TaggedRecordsData< Type > &_data)
: id(_id), data(_data)
{
}
std::ostream &
print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("id", id);
printer.printAttribute("data", data);
return stream;
}
};
template < typename Value >
bool
operator==(const TaggedRecords< Value > &lhs,
const TaggedRecords< Value > &rhs)
{
return std::tie(lhs.id, lhs.data) == std::tie(rhs.id, rhs.data);
}
template < typename Value >
std::ostream &
operator<<(std::ostream &stream, const TaggedRecords< Value > &rec)
{
return rec.print(stream, -1, -1);
}
template < typename Type >
class SampleGroup
{
public:
using RecordType = TaggedRecords< Type >;
using const_iterator =
typename absl::Span< const RecordType >::const_iterator;
private:
absl::Span< const RecordType > m_records;
absl::Duration m_samplePeriod;
public:
SampleGroup() : m_records(), m_samplePeriod()
{
}
SampleGroup(const RecordType *records, size_t size,
absl::Duration samplePeriod)
: m_records(records, size), m_samplePeriod(samplePeriod)
{
}
SampleGroup(const absl::Span< const RecordType > &records,
absl::Duration samplePeriod)
: m_records(records), m_samplePeriod(samplePeriod)
{
}
// clang-format off
void samplePeriod(absl::Duration duration) { m_samplePeriod = duration; }
absl::Duration samplePeriod() const { return m_samplePeriod; }
void records(absl::Span<const RecordType> recs) { m_records = recs; }
absl::Span<const RecordType> records() const { return m_records; }
bool empty() const { return m_records.empty(); }
size_t size() const { return m_records.size(); }
const_iterator begin() const { return m_records.begin(); }
const_iterator end() const { return m_records.end(); }
// clang-format on
std::ostream &
print(std::ostream &stream, int level, int spaces) const
{
Printer::PrintFunction< absl::Duration > durationPrinter =
[](std::ostream &os, const absl::Duration &duration, int,
int) -> std::ostream & {
os << duration;
return os;
};
Printer printer(stream, level, spaces);
printer.printAttribute("records", m_records);
printer.printForeignAttribute("samplePeriod", m_samplePeriod,
durationPrinter);
return stream;
}
};
template < typename Type >
inline std::ostream &
operator<<(std::ostream &stream, const SampleGroup< Type > &group)
{
return group.print(stream, -1, -1);
}
template < typename Type >
inline bool
operator==(const SampleGroup< Type > &lhs, const SampleGroup< Type > &rhs)
{
return lhs.records() == rhs.records()
&& lhs.samplePeriod() == rhs.samplePeriod();
}
class Sample
{
absl::Time m_sampleTime;
std::vector< absl::variant< SampleGroup< double >, SampleGroup< int > > >
m_samples;
size_t m_recordCount{0};
public:
using const_iterator = typename decltype(m_samples)::const_iterator;
Sample() : m_sampleTime()
{
}
// clang-format off
void sampleTime(const absl::Time& time) { m_sampleTime = time; }
absl::Time sampleTime() const { return m_sampleTime; }
template<typename Type>
void pushGroup(const SampleGroup<Type>& group) {
if (!group.empty()) {
m_samples.emplace_back(group);
m_recordCount += group.size();
}
}
template<typename Type>
void pushGroup(const TaggedRecords< Type > *records, size_t size, absl::Duration duration) {
if (size != 0) {
m_samples.emplace_back(SampleGroup<Type>(records, size, duration));
m_recordCount += size;
}
}
template<typename Type>
void pushGroup(const absl::Span< const TaggedRecords< Type > > &records,absl::Duration duration) {
if (!records.empty()) {
m_samples.emplace_back(SampleGroup<Type>(records, duration));
m_recordCount += records.size();
}
}
void clear() {
m_samples.clear();
m_recordCount = 0;
}
const absl::variant<SampleGroup<double>, SampleGroup<int> >& group(size_t index) {
assert(index < m_samples.size());
return m_samples[index];
}
const_iterator begin() const { return m_samples.begin(); }
const_iterator end() const { return m_samples.end(); }
size_t groupCount() const { return m_samples.size(); }
size_t recordCount() const { return m_recordCount; }
// clang-format on
};
inline absl::Duration
samplePeriod(
const absl::variant< SampleGroup< double >, SampleGroup< int > > &group)
{
return absl::visit([](const auto &x) { return x.samplePeriod(); }, group);
}
inline size_t
sampleSize(
const absl::variant< SampleGroup< double >, SampleGroup< int > > &group)
{
return absl::visit([](const auto &x) { return x.size(); }, group);
}
} // namespace metrics
} // namespace llarp
#endif

View file

@ -3,9 +3,7 @@
#include <util/string_view.hpp>
#include <util/meta/traits.hpp>
#include <util/meta/variant.hpp>
#include <absl/types/variant.h>
#include <functional>
#include <iostream>
#include <cassert>
@ -197,11 +195,6 @@ namespace llarp
printType(std::ostream& stream, const std::tuple< Types... >& value,
int level, int spaces, traits::select::Case<>);
template < typename... Types >
static void
printType(std::ostream& stream, const absl::variant< Types... >& value,
int level, int spaces, traits::select::Case<>);
// Default type
template < typename Type >
static void
@ -493,17 +486,6 @@ namespace llarp
[&](const auto& x) { print.printValue(x); });
}
template < typename... Types >
inline void
PrintHelper::printType(std::ostream& stream,
const absl::variant< Types... >& value, int level,
int spaces, traits::select::Case<>)
{
Printer print(stream, level, spaces);
absl::visit([&](const auto& x) { print.printValue(x); }, value);
}
template < typename Type >
inline void
PrintHelper::printType(std::ostream& stream, const Type& value, int level,

View file

@ -1 +0,0 @@
#include <util/status.hpp>

View file

@ -8,7 +8,6 @@
#include <vector>
#include <string>
#include <algorithm>
#include <absl/types/variant.h>
namespace llarp
{

View file

@ -1 +0,0 @@
#include <util/stopwatch.hpp>

View file

@ -1,53 +0,0 @@
#ifndef LLARP_STOPWATCH_HPP
#define LLARP_STOPWATCH_HPP
#include <nonstd/optional.hpp>
#include <absl/time/clock.h>
namespace llarp
{
namespace util
{
class Stopwatch
{
nonstd::optional< absl::Time > m_start;
nonstd::optional< absl::Time > m_stop;
public:
Stopwatch() = default;
void
start()
{
assert(!m_start);
assert(!m_stop);
m_start.emplace(absl::Now());
}
void
stop()
{
assert(m_start);
assert(!m_stop);
m_stop.emplace(absl::Now());
}
bool
done() const
{
return m_start && m_stop;
}
absl::Duration
time() const
{
assert(m_start);
assert(m_stop);
return m_stop.value() - m_start.value();
}
};
} // namespace util
} // namespace llarp
#endif

View file

@ -1 +0,0 @@
#include <util/string_view.hpp>

View file

@ -0,0 +1,61 @@
#pragma once
// Clang thread safety analysis macros. Does nothing under non-clang compilers.
// Enable thread safety attributes only with clang.
// The attributes can be safely erased when compiling with other compilers.
#if defined(__clang__) && (!defined(SWIG))
#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
#else
#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
#endif
#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))
#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)
#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))
#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))
#define ACQUIRED_BEFORE(...) \
THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))
#define ACQUIRED_AFTER(...) \
THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))
#define REQUIRES(...) \
THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))
#define REQUIRES_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))
#define ACQUIRE(...) \
THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))
#define ACQUIRE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))
#define RELEASE(...) \
THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))
#define RELEASE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))
#define TRY_ACQUIRE(...) \
THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))
#define TRY_ACQUIRE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))
#define EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))
#define ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))
#define ASSERT_SHARED_CAPABILITY(x) \
THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))
#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))
#define NO_THREAD_SAFETY_ANALYSIS \
THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)

View file

@ -0,0 +1,46 @@
#pragma once
#include <mutex>
#include <condition_variable>
namespace llarp
{
namespace util
{
/// Barrier class that blocks all threads until the high water mark of
/// threads (set during construction) is reached, then releases them all.
class Barrier
{
std::mutex mutex;
std::condition_variable cv;
unsigned pending;
public:
Barrier(unsigned threads) : pending{threads}
{
}
/// Returns true if *this* Block call is the one that releases all of
/// them; returns false (i.e. after unblocking) if some other thread
/// triggered the released.
bool
Block()
{
std::unique_lock< std::mutex > lock{mutex};
if(pending == 1)
{
pending = 0;
lock.unlock();
cv.notify_all();
return true;
}
else if(pending > 1)
{
pending--;
}
cv.wait(lock, [this] { return !pending; });
return false;
}
};
} // namespace util
} // namespace llarp

View file

@ -1,7 +1,6 @@
#include <util/thread/logic.hpp>
#include <util/logging/logger.hpp>
#include <util/mem.h>
#include <util/metrics/metrics.hpp>
#include <future>
@ -53,28 +52,10 @@ namespace llarp
Logic::_traceLogicCall(std::function< void(void) > func, const char* tag,
int line)
{
#define TAG (tag ? tag : LOG_TAG)
#define LINE (line ? line : __LINE__)
// wrap the function so that we ensure that it's always calling stuff one at
// a time
#if defined(LOKINET_DEBUG)
#define METRIC(action) \
metrics::integerTick("logic", action, 1, "tag", TAG, "line", \
std::to_string(LINE))
#else
#define METRIC(action) \
do \
{ \
} while(false)
#endif
METRIC("queue");
auto f = [self = this, func, tag, line]() {
#if defined(LOKINET_DEBUG)
metrics::TimerGuard g("logic",
std::string(TAG) + ":" + std::to_string(LINE));
#endif
auto f = [self = this, func]() {
if(self->m_Queue)
{
func();
@ -86,7 +67,6 @@ namespace llarp
};
if(can_flush())
{
METRIC("fired");
f();
return true;
}
@ -97,21 +77,16 @@ namespace llarp
}
if(m_Thread->LooksFull(5))
{
LogErrorExplicit(TAG, LINE,
LogErrorExplicit(tag ? tag : LOG_TAG, line ? line : __LINE__,
"holy crap, we are trying to queue a job "
"onto the logic thread but it looks full");
METRIC("full");
std::abort();
}
auto ret = llarp_threadpool_queue_job(m_Thread, f);
if(not ret)
{
METRIC("dropped");
}
return ret;
#undef TAG
#undef LINE
#undef METRIC
}
void

Some files were not shown because too many files have changed in this diff Show more