Metric collection subsystem

This commit is contained in:
Michael 2019-02-27 21:46:23 +00:00
parent acfff4ca5c
commit f2c5d32399
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C
17 changed files with 4737 additions and 2 deletions

View File

@ -189,7 +189,7 @@ if(NOT DEBIAN)
endif(NOT DEBIAN)
if(ASAN)
set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=thread -fno-omit-frame-pointer)
set(DEBUG_FLAGS ${DEBUG_FLAGS} -fsanitize=address -fno-omit-frame-pointer)
set(OPTIMIZE_FLAGS "-O0")
endif(ASAN)

View File

@ -17,12 +17,17 @@ set(LIB_UTIL_SRC
util/logger.cpp
util/logic.cpp
util/mem.cpp
util/metrics_core.cpp
util/metrics_publishers.cpp
util/metrics_types.cpp
util/metrics.cpp
util/object.cpp
util/printer.cpp
util/queue_manager.cpp
util/queue.cpp
util/scheduler.cpp
util/status.cpp
util/stopwatch.cpp
util/str.cpp
util/string_view.cpp
util/thread_pool.cpp

1
llarp/util/metrics.cpp Normal file
View File

@ -0,0 +1 @@
#include <util/metrics.hpp>

179
llarp/util/metrics.hpp Normal file
View File

@ -0,0 +1,179 @@
#ifndef LLARP_METRICS_HPP
#define LLARP_METRICS_HPP
#include <util/metrics_types.hpp>
#include <util/metrics_core.hpp>
namespace llarp
{
namespace metrics
{
struct MetricsHelper
{
static void
initContainer(CategoryContainer& container, const char* category)
{
Manager* manager = DefaultManager::instance();
Registry& registry = manager->registry();
registry.registerContainer(registry.get(category), container);
}
static void
setType(const Id& id, Publication::Type type)
{
Manager* manager = DefaultManager::instance();
return manager->registry().publicationType(id, type);
}
};
} // namespace metrics
} // namespace llarp
// Some MSVC flags mess with __LINE__, but __COUNTER__ is better anyway
#ifdef _MSC_VER
#define METRICS_UNIQ_NUMBER __COUNTER__
#else
#define METRICS_UNIQ_NUMBER __LINE__
#endif
// Use a level of indirection to force the preprocessor to expand args first.
#define METRICS_NAME_CAT_IMP(X, Y) X##Y
#define METRICS_NAME_CAT(X, Y) METRICS_NAME_CAT_IMP(X, Y)
#define METRICS_UNIQUE_NAME(X) METRICS_NAME_CAT(X, METRICS_UNIQ_NUMBER)
#define METRICS_TIME_BLOCK_IMP(CAT, METRIC, VAR_NAME) \
llarp::metrics::DoubleCollector* VAR_NAME = nullptr; \
if(llarp::metrics::DefaultManager::instance()) \
{ \
using namespace llarp::metrics; \
CollectorRepo& repo = DefaultManager::instance()->collectorRepo(); \
VAR_NAME = repo.defaultDoubleCollector((CAT), (METRIC)); \
} \
llarp::metrics::TimerGuard METRICS_UNIQUE_NAME(timer_guard)(VAR_NAME);
#define METRICS_TIME_BLOCK(CAT, METRIC) \
METRICS_TIME_BLOCK_IMP(CAT, METRIC, METRICS_UNIQUE_NAME(time_block))
#define METRICS_IF_CATEGORY_ENABLED_IMP(CAT, NAME) \
static llarp::metrics::CategoryContainer NAME = {false, nullptr, nullptr}; \
if(!NAME.category() && llarp::metrics::DefaultManager::instance()) \
{ \
llarp::metrics::MetricsHelper::initContainer(NAME, CAT); \
} \
if(NAME.enabled())
#define METRICS_IF_CATEGORY_ENABLED(CAT) \
BALM_METRICS_IF_CATEGORY_ENABLED_IMP(CAT, METRICS_UNIQUE_NAME(Container))
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_INT_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo& repository = DefaultManager::instance()->collectorRepo(); \
IntCollector* collector = \
repository.defaultIntCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
} while(false)
// For when the category/metric remain static
#define METRICS_INT_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static IntCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getIntCollector(CAT, METRIC); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
#define METRICS_TYPED_INT_UPDATE(CAT, METRIC, VALUE, TYPE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static IntCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getIntCollector(CAT, METRIC); \
MetricHelper::setType(collector->id(), TYPE); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
// For when the category/metric may change during the program run
#define METRICS_DYNAMIC_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
if(DefaultManager::instance()) \
{ \
CollectorRepo& repository = DefaultManager::instance()->collectorRepo(); \
DoubleCollector* collector = \
repository.defaultDoubleCollector((CAT), (METRIC)); \
if(collector->id().category()->enabled()) \
{ \
collector->tick((VALUE)); \
} \
} \
} while(false)
// For when the category/metric remain static
#define METRICS_UPDATE(CAT, METRIC, VALUE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static DoubleCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getDoubleCollector(CAT, METRIC); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
#define METRICS_TYPED_UPDATE(CAT, METRIC, VALUE, TYPE) \
do \
{ \
using namespace llarp::metrics; \
static CategoryContainer container = {false, nullptr, nullptr}; \
static DoubleCollector* collector = nullptr; \
if(container.category() == nullptr && DefaultManager::instance()) \
{ \
collector = MetricHelper::getDoubleCollector(CAT, METRIC); \
MetricHelper::setType(collector->id(), TYPE); \
MetricHelper::initContainer(container, CAT); \
} \
if(container.enabled()) \
{ \
collector->tick(VALUE); \
} \
} while(false)
#define METRICS_DYNAMIC_INCREMENT(CAT, METRIC) \
METRICS_DYNAMIC_INT_UPDATE(CAT, METRIC, 1)
#define METRICS_INCREMENT(CAT, METRIC) METRICS_INT_UPDATE(CAT, METRIC, 1)
#endif

910
llarp/util/metrics_core.cpp Normal file
View File

@ -0,0 +1,910 @@
#include <util/metrics_core.hpp>
#include <iostream>
namespace llarp
{
namespace metrics
{
Record
IntCollector::loadAndClear()
{
size_t count;
uint64_t total;
int min;
int max;
{
absl::WriterMutexLock l(&m_mutex);
count = m_count;
total = m_total;
min = m_min;
max = m_max;
m_count = 0;
m_total = 0;
m_min = DEFAULT_MIN;
m_max = DEFAULT_MAX;
}
return {m_id, count, static_cast< double >(total),
(min == DEFAULT_MIN) ? Record::DEFAULT_MIN
: static_cast< double >(min),
(max == DEFAULT_MAX) ? Record::DEFAULT_MAX
: static_cast< double >(max)};
}
Record
IntCollector::load()
{
size_t count;
int64_t total;
int min;
int max;
{
absl::ReaderMutexLock l(&m_mutex);
count = m_count;
total = m_total;
min = m_min;
max = m_max;
}
return {m_id, count, static_cast< double >(total),
(min == DEFAULT_MIN) ? Record::DEFAULT_MIN : min,
(max == DEFAULT_MAX) ? Record::DEFAULT_MAX : max};
}
std::tuple< Id, bool >
Registry::insert(const char *category, const char *name)
{
// avoid life time issues, putting strings in the stringmem set
const char *cStr = m_stringmem.emplace(category).first->c_str();
const char *nStr = m_stringmem.emplace(name).first->c_str();
NamedCategory namedCategory(cStr, nStr);
const auto it = m_metrics.find(namedCategory);
if(it != m_metrics.end())
{
return std::make_tuple(Id(it->second.get()), false);
}
auto cIt = m_categories.find(cStr);
if(cIt == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
cIt = m_categories.emplace(cStr, ptr).first;
}
const auto mPtr =
std::make_shared< Description >(cIt->second.get(), nStr);
m_metrics.emplace(namedCategory, mPtr);
return {Id(mPtr.get()), true};
}
Id
Registry::add(const char *category, const char *name)
{
absl::WriterMutexLock l(&m_mutex);
auto result = insert(category, name);
return std::get< 1 >(result) ? std::get< 0 >(result) : Id();
}
Id
Registry::get(const char *category, const char *name)
{
Id result = findId(category, name);
if(result)
{
return result;
}
absl::WriterMutexLock l(&m_mutex);
return std::get< 0 >(insert(category, name));
}
const Category *
Registry::add(const char *category)
{
absl::WriterMutexLock l(&m_mutex);
const char *cStr = m_stringmem.emplace(category).first->c_str();
auto it = m_categories.find(cStr);
if(it == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
it = m_categories.emplace(cStr, ptr).first;
return it->second.get();
}
return nullptr;
}
const Category *
Registry::get(const char *category)
{
const Category *cPtr = findCategory(category);
if(cPtr)
{
return cPtr;
}
absl::WriterMutexLock l(&m_mutex);
const char *cStr = m_stringmem.emplace(category).first->c_str();
auto it = m_categories.find(cStr);
if(it == m_categories.end())
{
auto ptr = std::make_shared< Category >(cStr, m_defaultEnabled);
it = m_categories.emplace(cStr, ptr).first;
}
return it->second.get();
}
void
Registry::enable(const Category *category, bool value)
{
absl::WriterMutexLock l(&m_mutex);
const_cast< Category * >(category)->enabled(value);
}
void
Registry::enableAll(bool value)
{
absl::WriterMutexLock l(&m_mutex);
if(value == m_defaultEnabled)
{
return;
}
m_defaultEnabled = value;
std::for_each(m_categories.begin(), m_categories.end(),
[&](auto &x) { x.second->enabled(value); });
}
void
Registry::registerContainer(const Category *category,
CategoryContainer &container)
{
absl::WriterMutexLock l(&m_mutex);
if(container.m_category == nullptr)
{
const_cast< Category * >(category)->registerContainer(&container);
}
}
void
Registry::publicationType(const Id &id, Publication::Type type)
{
const_cast< Description * >(id.description())->type(type);
}
void
Registry::setFormat(const Id &id, const Format &format)
{
Description *description = const_cast< Description * >(id.description());
absl::WriterMutexLock l(&m_mutex);
auto fmtPtr = std::make_shared< Format >(format);
for(byte_t i = 0; i < Publication::MaxSize; ++i)
{
auto type = static_cast< Publication::Type >(i);
const FormatSpec *spec = format.specFor(type);
if(spec != nullptr)
{
const char *fmt = m_stringmem.emplace(spec->m_format).first->c_str();
fmtPtr->setSpec(type, FormatSpec(spec->m_scale, fmt));
}
}
description->format(fmtPtr);
}
const Category *
Registry::findCategory(const char *category) const
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_categories.find(category);
return it == m_categories.end() ? nullptr : it->second.get();
}
Id
Registry::findId(const char *category, const char *name) const
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_metrics.find(std::make_tuple(category, name));
return it == m_metrics.end() ? Id() : Id(it->second.get());
}
std::vector< const Category * >
Registry::getAll() const
{
absl::ReaderMutexLock l(&m_mutex);
std::vector< const Category * > result;
result.reserve(m_categories.size());
std::transform(m_categories.begin(), m_categories.end(),
std::back_inserter(result),
[](const auto &x) { return x.second.get(); });
return result;
}
MetricCollectors &
CollectorRepo::getCollectors(const Id &id)
{
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
assert(id.valid());
const Category *cat = id.category();
auto ptr = std::make_shared< MetricCollectors >(id);
auto &vec = m_categories[cat];
vec.reserve(vec.size() + 1);
it = m_collectors.emplace(id, ptr).first;
vec.push_back(ptr.get());
}
return *it->second.get();
}
std::vector< Record >
CollectorRepo::collectAndClear(const Category *category)
{
absl::WriterMutexLock l(&m_mutex);
std::vector< Record > result;
auto it = m_categories.find(category);
if(it != m_categories.end())
{
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(
collectors.begin(), collectors.end(), std::back_inserter(result),
[](MetricCollectors *c) { return c->combineAndClear(); });
}
return result;
}
std::vector< Record >
CollectorRepo::collect(const Category *category)
{
absl::WriterMutexLock l(&m_mutex);
std::vector< Record > result;
auto it = m_categories.find(category);
if(it != m_categories.end())
{
auto &collectors = it->second;
result.reserve(collectors.size());
std::transform(collectors.begin(), collectors.end(),
std::back_inserter(result),
[](MetricCollectors *c) { return c->combine(); });
}
return result;
}
DoubleCollector *
CollectorRepo::defaultDoubleCollector(const Id &id)
{
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->doubleCollectors().defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).doubleCollectors().defaultCollector();
}
}
IntCollector *
CollectorRepo::defaultIntCollector(const Id &id)
{
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it != m_collectors.end())
{
return it->second->intCollectors().defaultCollector();
}
}
{
absl::WriterMutexLock l(&m_mutex);
return getCollectors(id).intCollectors().defaultCollector();
}
}
std::pair< std::vector< std::shared_ptr< DoubleCollector > >,
std::vector< std::shared_ptr< IntCollector > > >
CollectorRepo::allCollectors(const Id &id)
{
absl::ReaderMutexLock l(&m_mutex);
auto it = m_collectors.find(id);
if(it == m_collectors.end())
{
return {};
}
return {it->second->doubleCollectors().collectors(),
it->second->intCollectors().collectors()};
}
struct PublisherHelper
{
using SampleCache = std::map< std::shared_ptr< Publisher >, Sample >;
static void
updateSampleCache(SampleCache &cache,
const std::shared_ptr< Publisher > &publisher,
const SampleGroup &sampleGroup,
const absl::Time &timeStamp)
{
SampleCache::iterator it = cache.find(publisher);
if(it == cache.end())
{
Sample newSample;
newSample.sampleTime(timeStamp);
it = cache.emplace(publisher, newSample).first;
}
it->second.pushGroup(sampleGroup);
}
static std::pair< std::vector< Record >, absl::Duration >
collect(Manager &manager, const Category *category,
const absl::Duration &now, bool clear)
EXCLUSIVE_LOCKS_REQUIRED(manager.m_mutex)
{
using Callback = Manager::RecordCallback;
using CallbackVector = std::vector< const Callback * >;
using RegistryIterator = CallbackRegistry::iterator;
CallbackVector callbacks;
RegistryIterator begin = manager.m_callbacks.lowerBound(category);
RegistryIterator end = manager.m_callbacks.upperBound(category);
std::vector< Record > result;
std::for_each(begin, end, [&](const auto &x) {
std::vector< Record > tmp = (x.second)(clear);
result.insert(result.end(), tmp.begin(), tmp.end());
});
// Collect records from the repo.
if(clear)
{
std::vector< Record > tmp = manager.m_repo.collectAndClear(category);
result.insert(result.end(), tmp.begin(), tmp.end());
}
else
{
std::vector< Record > tmp = manager.m_repo.collect(category);
result.insert(result.end(), tmp.begin(), tmp.end());
}
// Get the time since last reset, and clear if needed.
Manager::ResetTimes::iterator it = manager.m_resetTimes.find(category);
if(it == manager.m_resetTimes.end())
{
if(clear)
{
manager.m_resetTimes.emplace(category, now);
}
return {result, now - manager.m_createTime};
}
else
{
auto tmp = now - it->second;
if(clear)
{
it->second = now;
}
return {result, tmp};
}
}
template < typename CategoryIterator >
static void
publish(Manager &manager, const CategoryIterator &categoriesBegin,
const CategoryIterator &categoriesEnd, bool clear)
{
if(categoriesBegin == categoriesEnd)
{
return;
}
using RecordBuffer =
std::vector< std::shared_ptr< std::vector< Record > > >;
RecordBuffer recordBuffer;
SampleCache sampleCache;
absl::Time timeStamp = absl::Now();
absl::Duration now = absl::Now() - absl::UnixEpoch();
{
// 1.
absl::WriterMutexLock publishGuard(&manager.m_publishLock);
// 2.
absl::WriterMutexLock propertiesGuard(&manager.m_mutex);
// Build the 'sampleCache' by iterating over the categories and
// collecting records for those categories.
for(CategoryIterator catIt = categoriesBegin; catIt != categoriesEnd;
++catIt)
{
if(!(*catIt)->enabled())
{
continue;
}
// Collect the metrics.
auto result = collect(manager, *catIt, now, clear);
// If their are no collected records then this category can be
// ignored.
if(result.first.empty())
{
continue;
}
if(result.second == absl::Duration())
{
std::cerr << "Invalid elapsed time interval of 0 for "
"published metrics.";
result.second += absl::Nanoseconds(1);
}
// Append the collected records to the buffer of records.
auto records =
std::make_shared< std::vector< Record > >(result.first);
recordBuffer.push_back(records);
SampleGroup sampleGroup(absl::Span< Record >(*records),
result.second);
std::for_each(
manager.m_publishers.globalBegin(),
manager.m_publishers.globalEnd(), [&](const auto &ptr) {
updateSampleCache(sampleCache, ptr, sampleGroup, timeStamp);
});
std::for_each(manager.m_publishers.lowerBound(*catIt),
manager.m_publishers.upperBound(*catIt),
[&](const auto &val) {
updateSampleCache(sampleCache, val.second,
sampleGroup, timeStamp);
});
}
}
for(auto &entry : sampleCache)
{
Publisher *publisher = entry.first.get();
Sample &sample = entry.second;
publisher->publish(sample);
}
}
};
Sample
Manager::collectSample(std::vector< Record > &records,
absl::Span< const Category * > categories,
bool clear)
{
absl::Time timeStamp = absl::Now();
absl::Duration now = timeStamp - absl::UnixEpoch();
Sample sample;
sample.sampleTime(timeStamp);
// Use a tuple to hold 'references' to the collected records
using SampleDescription = std::tuple< size_t, size_t, absl::Duration >;
std::vector< SampleDescription > samples;
samples.reserve(categories.size());
// 1
absl::WriterMutexLock publishGuard(&m_publishLock);
// 2
absl::WriterMutexLock propertiesGuard(&m_mutex);
for(const Category *const category : categories)
{
if(!category->enabled())
{
continue;
}
size_t beginIndex = records.size();
// Collect the metrics.
std::vector< Record > catRecords;
absl::Duration elapsedTime;
std::tie(catRecords, elapsedTime) =
PublisherHelper::collect(*this, category, now, clear);
records.insert(records.end(), catRecords.begin(), catRecords.end());
size_t size = records.size() - beginIndex;
// If there are no collected records then this category can be ignored.
if(size != 0)
{
samples.emplace_back(beginIndex, size, elapsedTime);
}
}
// Now that we have all the records, we can build our sample
for(const SampleDescription &s : samples)
{
sample.pushGroup(&records[std::get< 0 >(s)], std::get< 1 >(s),
std::get< 2 >(s));
}
return sample;
}
void
Manager::publish(absl::Span< const Category * > categories, bool clear)
{
PublisherHelper::publish(*this, categories.begin(), categories.end(),
clear);
}
void
Manager::publish(const std::set< const Category * > &categories, bool clear)
{
PublisherHelper::publish(*this, categories.begin(), categories.end(),
clear);
}
Manager *DefaultManager::m_manager = nullptr;
struct PublisherSchedulerData
{
util::Mutex m_mutex;
thread::Scheduler::Handle m_handle;
std::set< const Category * > m_categories;
bool m_default;
std::set< const Category * > m_nonDefaultCategories;
PublisherSchedulerData()
: m_handle(thread::Scheduler::INVALID_HANDLE), m_default(false)
{
}
};
// Reverts a publisher scheduler back to its default state
class PublisherSchedulerGuard
{
PublisherScheduler *m_scheduler;
public:
PublisherSchedulerGuard(PublisherScheduler *scheduler)
: m_scheduler(scheduler)
{
}
~PublisherSchedulerGuard()
{
if(m_scheduler != nullptr)
{
for(auto &repeat : m_scheduler->m_repeaters)
{
if(repeat.second->m_handle != thread::Scheduler::INVALID_HANDLE)
{
m_scheduler->m_scheduler.cancelRepeat(repeat.second->m_handle);
}
}
m_scheduler->m_defaultInterval = absl::Duration();
m_scheduler->m_repeaters.clear();
m_scheduler->m_categories.clear();
}
}
void
release()
{
m_scheduler = nullptr;
}
};
void
PublisherScheduler::publish(
const std::shared_ptr< PublisherSchedulerData > &data) const
{
util::Lock l(&data->m_mutex);
if(data->m_default)
{
m_manager->publishAllExcluding(data->m_nonDefaultCategories);
}
else if(!data->m_categories.empty())
{
m_manager->publish(data->m_categories);
}
}
void
PublisherScheduler::cancel(Categories::iterator it)
{
assert(it != m_categories.end());
auto repeatIt = m_repeaters.find(it->second);
assert(repeatIt != m_repeaters.end());
const Category *category = it->first;
m_categories.erase(it);
auto data = repeatIt->second;
{
util::Lock l(&data->m_mutex);
assert(data->m_categories.find(category) != data->m_categories.end());
data->m_categories.erase(category);
}
if(!data->m_default)
{
if(data->m_categories.empty())
{
m_scheduler.cancelRepeat(data->m_handle);
m_repeaters.erase(repeatIt);
}
if(m_defaultInterval != absl::Duration())
{
auto defaultIntervalIt = m_repeaters.find(m_defaultInterval);
assert(defaultIntervalIt != m_repeaters.end());
auto &defaultRepeater = defaultIntervalIt->second;
util::Lock l(&defaultRepeater->m_mutex);
defaultRepeater->m_nonDefaultCategories.erase(category);
}
}
}
bool
PublisherScheduler::cancelDefault()
{
if(m_defaultInterval == absl::Duration())
{
return false;
}
absl::Duration interval = m_defaultInterval;
m_defaultInterval = absl::Duration();
auto repeatIt = m_repeaters.find(interval);
assert(repeatIt != m_repeaters.end());
auto data = repeatIt->second;
if(data->m_categories.empty())
{
assert(data->m_handle != thread::Scheduler::INVALID_HANDLE);
m_scheduler.cancelRepeat(data->m_handle);
m_repeaters.erase(repeatIt);
}
else
{
util::Lock l(&data->m_mutex);
data->m_default = false;
data->m_nonDefaultCategories.clear();
}
return true;
}
void
PublisherScheduler::schedule(const Category *category,
absl::Duration interval)
{
assert(absl::Seconds(0) < interval);
util::Lock l(&m_mutex);
auto catIt = m_categories.find(category);
if(catIt != m_categories.end())
{
if(catIt->second == interval)
{
return;
}
cancel(catIt);
}
// Make a guard, so if something throws, the scheduler is reset to a
// somewhat "sane" state (no metrics).
PublisherSchedulerGuard guard(this);
m_categories.emplace(category, interval);
auto repeatIt = m_repeaters.find(interval);
std::shared_ptr< PublisherSchedulerData > data;
// Create a new 'ClockData' object if one does not exist for the
// 'interval', otherwise update the existing 'data'.
if(repeatIt == m_repeaters.end())
{
data = std::make_shared< PublisherSchedulerData >();
data->m_categories.insert(category);
m_repeaters.emplace(interval, data);
util::Lock lock(&data->m_mutex);
data->m_handle = m_scheduler.scheduleRepeat(
interval, std::bind(&PublisherScheduler::publish, this, data));
}
else
{
data = repeatIt->second;
util::Lock lock(&data->m_mutex);
data->m_categories.insert(category);
}
// If this isn't being added to the default schedule, then add to the set
// of non-default categories in the default schedule.
if(!data->m_default && m_defaultInterval != absl::Duration())
{
auto defaultIntervalIt = m_repeaters.find(m_defaultInterval);
assert(defaultIntervalIt != m_repeaters.end());
auto &defaultInterval = defaultIntervalIt->second;
util::Lock lock(&defaultInterval->m_mutex);
defaultInterval->m_nonDefaultCategories.insert(category);
}
guard.release();
}
void
PublisherScheduler::setDefault(absl::Duration interval)
{
assert(absl::Seconds(0) < interval);
util::Lock l(&m_mutex);
// If its already this interval, return early.
if(interval == m_defaultInterval)
{
return;
}
cancelDefault();
m_defaultInterval = interval;
// Make a guard, so if something throws, the scheduler is reset to a
// somewhat "sane" state (no metrics).
PublisherSchedulerGuard guard(this);
std::shared_ptr< PublisherSchedulerData > data;
auto repeatIt = m_repeaters.find(interval);
if(repeatIt == m_repeaters.end())
{
data = std::make_shared< PublisherSchedulerData >();
m_repeaters.emplace(interval, data);
}
else
{
data = repeatIt->second;
}
util::Lock lock(&data->m_mutex);
data->m_default = true;
Categories::iterator cIt = m_categories.begin();
for(; cIt != m_categories.end(); ++cIt)
{
if(cIt->second != interval)
{
data->m_nonDefaultCategories.insert(cIt->first);
}
}
if(data->m_handle == thread::Scheduler::INVALID_HANDLE)
{
data->m_handle = m_scheduler.scheduleRepeat(
interval, std::bind(&PublisherScheduler::publish, this, data));
}
guard.release();
}
bool
PublisherScheduler::cancel(const Category *category)
{
util::Lock l(&m_mutex);
Categories::iterator it = m_categories.find(category);
if(it == m_categories.end())
{
// This category has no specific schedule.
return false;
}
cancel(it);
return true;
}
bool
PublisherScheduler::clearDefault()
{
util::Lock l(&m_mutex);
return cancelDefault();
}
void
PublisherScheduler::cancelAll()
{
util::Lock l(&m_mutex);
for(auto &repeat : m_repeaters)
{
m_scheduler.cancelRepeat(repeat.second->m_handle, true);
}
m_defaultInterval = absl::Duration();
m_repeaters.clear();
m_categories.clear();
}
absl::optional< absl::Duration >
PublisherScheduler::find(const Category *category) const
{
util::Lock l(&m_mutex);
auto it = m_categories.find(category);
if(it == m_categories.end())
{
return {};
}
else
{
return it->second;
}
}
absl::optional< absl::Duration >
PublisherScheduler::getDefault() const
{
util::Lock l(&m_mutex);
if(m_defaultInterval == absl::Duration())
{
return {};
}
else
{
return m_defaultInterval;
}
}
std::vector< std::pair< const Category *, absl::Duration > >
PublisherScheduler::getAll() const
{
util::Lock l(&m_mutex);
std::vector< std::pair< const Category *, absl::Duration > > result;
result.reserve(m_categories.size());
std::copy(m_categories.begin(), m_categories.end(),
std::back_inserter(result));
return result;
}
} // namespace metrics
} // namespace llarp

1335
llarp/util/metrics_core.hpp Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,189 @@
#include <util/metrics_publishers.hpp>
#include <iostream>
namespace llarp
{
namespace metrics
{
namespace
{
void
formatValue(std::ostream &stream, size_t value,
const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, (double)value, *formatSpec);
}
else
{
stream << value;
}
}
void
formatValue(std::ostream &stream, int value, const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, (double)value, *formatSpec);
}
else
{
stream << value;
}
}
void
formatValue(std::ostream &stream, double value,
const FormatSpec *formatSpec)
{
if(formatSpec)
{
FormatSpec::format(stream, value, *formatSpec);
}
else
{
stream << value;
}
}
void
formatValue(std::ostream &stream, const Record &record,
double elapsedTime, Publication::Type publicationType,
const FormatSpec *formatSpec)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
formatValue(stream, record.total(), formatSpec);
}
break;
case Publication::Type::Count:
{
formatValue(stream, record.count(), formatSpec);
}
break;
case Publication::Type::Min:
{
formatValue(stream, record.min(), formatSpec);
}
break;
case Publication::Type::Max:
{
formatValue(stream, record.max(), formatSpec);
}
break;
case Publication::Type::Avg:
{
formatValue(stream, record.total() / record.count(), formatSpec);
}
break;
case Publication::Type::Rate:
{
formatValue(stream, record.total() / elapsedTime, formatSpec);
}
break;
case Publication::Type::RateCount:
{
formatValue(stream, record.count() / elapsedTime, formatSpec);
}
break;
}
}
void
publishRecord(std::ostream &stream, const Record &record,
double elapsedTime)
{
auto publicationType = record.id().description()->type();
std::shared_ptr< const Format > format =
record.id().description()->format();
stream << "\t\t" << record.id() << " [ ";
if(publicationType != Publication::Type::Unspecified)
{
stream << Publication::repr(publicationType) << " = ";
const FormatSpec *formatSpec =
format ? format->specFor(publicationType) : nullptr;
formatValue(stream, record, elapsedTime, publicationType, formatSpec);
}
else
{
const FormatSpec *countSpec = nullptr;
const FormatSpec *totalSpec = nullptr;
const FormatSpec *minSpec = nullptr;
const FormatSpec *maxSpec = nullptr;
if(format)
{
countSpec = format->specFor(Publication::Type::Count);
totalSpec = format->specFor(Publication::Type::Total);
minSpec = format->specFor(Publication::Type::Min);
maxSpec = format->specFor(Publication::Type::Max);
}
stream << "count = ";
formatValue(stream, record.count(), countSpec);
stream << ", total = ";
formatValue(stream, record.total(), totalSpec);
if(Record::DEFAULT_MIN == record.min())
{
stream << ", min = undefined";
}
else
{
stream << ", min = ";
formatValue(stream, record.min(), minSpec);
}
if(Record::DEFAULT_MAX == record.max())
{
stream << ", max = undefined";
}
else
{
stream << ", max = ";
formatValue(stream, record.max(), maxSpec);
}
}
stream << " ]\n";
}
} // namespace
void
StreamPublisher::publish(const Sample &values)
{
if(values.recordCount() > 0)
{
m_stream << values.sampleTime() << " " << values.recordCount()
<< " Records\n";
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(gIt->samplePeriod());
if(gIt == prev || gIt->samplePeriod() != prev->samplePeriod())
{
m_stream << "\tElapsed Time: " << elapsedTime << "s\n";
}
for(const auto &record : *gIt)
{
publishRecord(m_stream, record, elapsedTime);
}
prev = gIt;
}
}
}
} // namespace metrics
} // namespace llarp

View File

@ -0,0 +1,32 @@
#ifndef LLARP_METRICS_PUBLISHERS_HPP
#define LLARP_METRICS_PUBLISHERS_HPP
#include <util/metrics_core.hpp>
#include <iosfwd>
namespace llarp
{
namespace metrics
{
class StreamPublisher final : public Publisher
{
std::ostream& m_stream;
public:
StreamPublisher(std::ostream& stream) : m_stream(stream)
{
}
~StreamPublisher()
{
}
void
publish(const Sample& values) override;
};
} // namespace metrics
} // namespace llarp
#endif

View File

@ -0,0 +1,171 @@
#include <util/metrics_types.hpp>
#include <util/printer.hpp>
namespace llarp
{
namespace metrics
{
const char *FormatSpec::DEFAULT_FORMAT = "%f";
std::ostream &
FormatSpec::format(std::ostream &stream, double data,
const FormatSpec &format)
{
static constexpr size_t INIT_SIZE = 32;
char buf[INIT_SIZE] = {0};
int rc = snprintf(buf, INIT_SIZE, format.m_format, data * format.m_scale);
if(rc < 0)
{
stream << "Bad format " << format.m_format << " applied to " << data;
return stream;
}
if(static_cast< size_t >(rc) < INIT_SIZE)
{
stream << buf;
return stream;
}
std::vector< char > vec(rc + 1);
rc = snprintf(vec.data(), vec.size(), format.m_format,
data * format.m_scale);
if(static_cast< size_t >(rc) > vec.size())
{
stream << "Bad format " << format.m_format << " applied to " << data;
return stream;
}
else
{
stream << vec.data();
return stream;
}
}
string_view
Publication::repr(Type val)
{
switch(val)
{
case Type::Unspecified:
return "Unspecified";
case Type::Total:
return "Total";
case Type::Count:
return "Count";
case Type::Min:
return "Min";
case Type::Max:
return "Max";
case Type::Avg:
return "Avg";
case Type::Rate:
return "Rate";
case Type::RateCount:
return "RateCount";
}
}
std::ostream &
Publication::print(std::ostream &stream, Type val)
{
stream << repr(val);
return stream;
}
Category::~Category()
{
while(m_container)
{
auto next = m_container->m_nextCategory;
m_container->clear();
m_container = next;
}
}
void
Category::enabled(bool val)
{
// sync point
if(m_enabled != val)
{
auto cont = m_container;
while(cont)
{
cont->m_enabled = val;
cont = cont->m_nextCategory;
}
m_enabled = val;
}
}
void
Category::registerContainer(CategoryContainer *container)
{
container->m_enabled = m_enabled;
container->m_category = this;
container->m_nextCategory = m_container;
m_container = container;
}
std::ostream &
Category::print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("name", m_name);
printer.printAttribute("enabled",
m_enabled.load(std::memory_order_relaxed));
return stream;
}
std::ostream &
Description::print(std::ostream &stream) const
{
util::Lock l(&m_mutex);
stream << m_category->name() << '.' << m_name;
return stream;
}
const double Record::DEFAULT_MIN = std::numeric_limits< double >::max() * 2;
const double Record::DEFAULT_MAX =
std::numeric_limits< double >::max() * -2;
std::ostream &
Record::print(std::ostream &stream, int level, int spaces) const
{
Printer printer(stream, level, spaces);
printer.printAttribute("id", m_id);
printer.printAttribute("count", m_count);
printer.printAttribute("total", m_total);
printer.printAttribute("min", m_min);
printer.printAttribute("max", m_max);
return stream;
}
std::ostream &
SampleGroup::print(std::ostream &stream, int level, int spaces) const
{
Printer::PrintFunction< absl::Duration > durationPrinter =
[](std::ostream &stream, const absl::Duration &duration, int,
int) -> std::ostream & {
stream << duration;
return stream;
};
Printer printer(stream, level, spaces);
printer.printAttribute("records", m_records);
printer.printForeignAttribute("samplePeriod", m_samplePeriod,
durationPrinter);
return stream;
}
} // namespace metrics
} // namespace llarp

View File

@ -0,0 +1,532 @@
#ifndef LLARP_METRICS_TYPES_HPP
#define LLARP_METRICS_TYPES_HPP
#include <util/string_view.hpp>
#include <util/threading.hpp>
#include <util/types.hpp>
#include <absl/types/span.h>
#include <iosfwd>
namespace llarp
{
namespace metrics
{
struct Publication
{
enum class Type : byte_t
{
Unspecified = 0, // no associated metric type
Total, // sum of seen values in the measurement period
Count, // count of seen events
Min, // Minimum value
Max, // Max value
Avg, // total / count
Rate, // total per second
RateCount // count per second
};
enum
{
MaxSize = static_cast< byte_t >(Type::RateCount) + 1
};
static string_view
repr(Type val);
static std::ostream &
print(std::ostream &stream, Type val);
};
struct FormatSpec
{
float m_scale;
const char *m_format;
static const char *DEFAULT_FORMAT;
FormatSpec() : m_scale(1.0), m_format(DEFAULT_FORMAT)
{
}
FormatSpec(float scale, const char *format)
: m_scale(scale), m_format(format)
{
}
static std::ostream &
format(std::ostream &stream, double data, const FormatSpec &spec);
};
inline bool
operator==(const FormatSpec &lhs, const FormatSpec &rhs)
{
return lhs.m_scale == rhs.m_scale
&& std::strcmp(lhs.m_format, rhs.m_format) == 0;
}
struct Format
{
using Spec = absl::optional< FormatSpec >;
std::array< Spec, Publication::MaxSize > m_specs;
Format() : m_specs()
{
}
void
setSpec(Publication::Type pub, const FormatSpec &spec)
{
m_specs[static_cast< size_t >(pub)].emplace(spec);
}
void
clear()
{
m_specs = decltype(m_specs)();
}
const FormatSpec *
specFor(Publication::Type val) const
{
const auto &spec = m_specs[static_cast< size_t >(val)];
return spec ? &spec.value() : nullptr;
}
};
inline bool
operator==(const Format &lhs, const Format &rhs)
{
return lhs.m_specs == rhs.m_specs;
}
struct CategoryContainer;
/// Represents a category of grouped metrics
class Category
{
const char *m_name;
std::atomic_bool m_enabled;
CategoryContainer *m_container;
public:
Category(const char *name, bool enabled = true)
: m_name(name), m_enabled(enabled), m_container(nullptr)
{
}
~Category();
void
enabled(bool flag);
void
registerContainer(CategoryContainer *container);
const std::atomic_bool &
enabledRaw() const
{
return m_enabled;
}
const char *
name() const
{
return m_name;
}
bool
enabled() const
{
return m_enabled;
}
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Category &c)
{
return c.print(stream, -1, -1);
}
struct CategoryContainer
{
bool m_enabled;
const Category *m_category;
CategoryContainer *m_nextCategory;
void
clear()
{
m_enabled = false;
m_category = nullptr;
m_nextCategory = nullptr;
}
};
class Description
{
mutable util::Mutex m_mutex;
const Category *m_category GUARDED_BY(m_mutex);
const char *m_name GUARDED_BY(m_mutex);
Publication::Type m_type GUARDED_BY(m_mutex);
std::shared_ptr< Format > m_format GUARDED_BY(m_mutex);
Description(const Description &) = delete;
Description &
operator=(const Description &) = delete;
public:
Description(const Category *category, const char *name)
: m_category(category)
, m_name(name)
, m_type(Publication::Type::Unspecified)
{
}
void
category(const Category *c)
{
util::Lock l(&m_mutex);
m_category = c;
}
const Category *
category() const
{
util::Lock l(&m_mutex);
return m_category;
}
void
name(const char *n)
{
util::Lock l(&m_mutex);
m_name = n;
}
const char *
name() const
{
util::Lock l(&m_mutex);
return m_name;
}
void
type(Publication::Type t)
{
util::Lock l(&m_mutex);
m_type = t;
}
Publication::Type
type() const
{
util::Lock l(&m_mutex);
return m_type;
}
void
format(const std::shared_ptr< Format > &f)
{
util::Lock l(&m_mutex);
m_format = f;
}
std::shared_ptr< Format >
format() const
{
util::Lock l(&m_mutex);
return m_format;
}
std::ostream &
print(std::ostream &stream) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Description &d)
{
return d.print(stream);
}
/// A metric id is what we will actually deal with in terms of metrics, in
/// order to make things like static initialisation cleaner.
class Id
{
const Description *m_description;
public:
Id() : m_description(nullptr)
{
}
Id(const Description *description) : m_description(description)
{
}
const Description *&
description()
{
return m_description;
}
const Description *const &
description() const
{
return m_description;
}
constexpr bool
valid() const noexcept
{
return m_description != nullptr;
}
explicit constexpr operator bool() const noexcept
{
return valid();
}
const Category *
category() const
{
assert(valid());
return m_description->category();
}
const char *
categoryName() const
{
assert(valid());
return m_description->category()->name();
}
const char *
metricName() const
{
assert(valid());
return m_description->name();
}
std::ostream &
print(std::ostream &stream, int, int) const
{
if(m_description)
{
stream << *m_description;
}
else
{
stream << "INVALID_METRIC";
}
return stream;
}
};
inline bool
operator==(const Id &lhs, const Id &rhs)
{
return lhs.description() == rhs.description();
}
inline bool
operator<(const Id &lhs, const Id &rhs)
{
return lhs.description() < rhs.description();
}
inline std::ostream &
operator<<(std::ostream &stream, const Id &id)
{
return id.print(stream, -1, -1);
}
class Record
{
Id m_id;
size_t m_count;
double m_total;
double m_min;
double m_max;
public:
static const double DEFAULT_MIN;
static const double DEFAULT_MAX;
Record()
: m_id()
, m_count(0)
, m_total(0.0)
, m_min(DEFAULT_MIN)
, m_max(DEFAULT_MAX)
{
}
explicit Record(const Id &id)
: m_id(id)
, m_count(0)
, m_total(0.0)
, m_min(DEFAULT_MIN)
, m_max(DEFAULT_MAX)
{
}
Record(const Id &id, size_t count, double total, double min, double max)
: m_id(id), m_count(count), m_total(total), m_min(min), m_max(max)
{
}
// clang-format off
const Id& id() const { return m_id; }
Id& id() { return m_id; }
size_t count() const { return m_count; }
size_t& count() { return m_count; }
double total() const { return m_total; }
double& total() { return m_total; }
double min() const { return m_min; }
double& min() { return m_min; }
double max() const { return m_max; }
double& max() { return m_max; }
// clang-format on
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const Record &rec)
{
return rec.print(stream, -1, -1);
}
inline bool
operator==(const Record &lhs, const Record &rhs)
{
return (lhs.id() == rhs.id() && lhs.count() == rhs.count()
&& lhs.total() == rhs.total() && lhs.min() == rhs.min()
&& lhs.max() == rhs.max());
}
class SampleGroup
{
absl::Span< const Record > m_records;
absl::Duration m_samplePeriod;
public:
using const_iterator = absl::Span< const Record >::const_iterator;
SampleGroup() : m_records(), m_samplePeriod()
{
}
SampleGroup(const Record *records, size_t size,
absl::Duration samplePeriod)
: m_records(records, size), m_samplePeriod(samplePeriod)
{
}
SampleGroup(const absl::Span< const Record > &records,
absl::Duration samplePeriod)
: m_records(records), m_samplePeriod(samplePeriod)
{
}
// clang-format off
void samplePeriod(absl::Duration duration) { m_samplePeriod = duration; }
absl::Duration samplePeriod() const { return m_samplePeriod; }
void records(absl::Span<const Record> recs) { m_records = recs; }
absl::Span<const Record> records() const { return m_records; }
bool empty() const { return m_records.empty(); }
size_t size() const { return m_records.size(); }
const_iterator begin() const { return m_records.begin(); }
const_iterator end() const { return m_records.end(); }
// clang-format on
std::ostream &
print(std::ostream &stream, int level, int spaces) const;
};
inline std::ostream &
operator<<(std::ostream &stream, const SampleGroup &group)
{
return group.print(stream, -1, -1);
}
inline bool
operator==(const SampleGroup &lhs, const SampleGroup &rhs)
{
return lhs.records() == rhs.records()
&& lhs.samplePeriod() == rhs.samplePeriod();
}
class Sample
{
absl::Time m_sampleTime;
std::vector< SampleGroup > m_samples;
size_t m_recordCount;
public:
using const_iterator = std::vector< SampleGroup >::const_iterator;
Sample() : m_sampleTime(), m_recordCount(0)
{
}
// clang-format off
void sampleTime(const absl::Time& time) { m_sampleTime = time; }
absl::Time sampleTime() const { return m_sampleTime; }
void pushGroup(const SampleGroup& group) {
if (!group.empty()) {
m_samples.push_back(group);
m_recordCount += group.size();
}
}
void pushGroup(const Record *records, size_t size, absl::Duration duration) {
if (size != 0) {
m_samples.emplace_back(records, size, duration);
m_recordCount += size;
}
}
void pushGroup(const absl::Span< const Record > &records,absl::Duration duration) {
if (!records.empty()) {
m_samples.emplace_back(records, duration);
m_recordCount += records.size();
}
}
void clear() {
m_samples.clear();
m_recordCount = 0;
}
const SampleGroup& group(size_t index) {
assert(index < m_samples.size());
return m_samples[index];
}
const_iterator begin() const { return m_samples.begin(); }
const_iterator end() const { return m_samples.end(); }
size_t groupCount() const { return m_samples.size(); }
size_t recordCount() const { return m_recordCount; }
// clang-format on
};
} // namespace metrics
} // namespace llarp
#endif

1
llarp/util/stopwatch.cpp Normal file
View File

@ -0,0 +1 @@
#include <util/stopwatch.hpp>

55
llarp/util/stopwatch.hpp Normal file
View File

@ -0,0 +1,55 @@
#ifndef LLARP_STOPWATCH_HPP
#define LLARP_STOPWATCH_HPP
#include <absl/types/optional.h>
#include <absl/time/clock.h>
namespace llarp
{
namespace util
{
class Stopwatch
{
absl::optional< absl::Time > m_start;
absl::optional< absl::Time > m_stop;
public:
Stopwatch()
{
}
void
start()
{
assert(!m_start);
assert(!m_stop);
m_start.emplace(absl::Now());
}
void
stop()
{
assert(m_start);
assert(!m_stop);
m_stop.emplace(absl::Now());
}
bool
done() const
{
return m_start && m_stop;
}
absl::Duration
time() const
{
assert(m_start);
assert(m_stop);
return m_stop.value() - m_start.value();
}
};
} // namespace util
} // namespace llarp
#endif

View File

@ -35,12 +35,15 @@ list(APPEND TEST_SRC
util/test_llarp_util_bits.cpp
util/test_llarp_util_encode.cpp
util/test_llarp_util_ini.cpp
util/test_llarp_util_metrics_core.cpp
util/test_llarp_util_metrics_publisher.cpp
util/test_llarp_util_metrics_types.cpp
util/test_llarp_util_object.cpp
util/test_llarp_util_printer.cpp
util/test_llarp_util_queue_manager.cpp
util/test_llarp_util_queue.cpp
util/test_llarp_utils_scheduler.cpp
util/test_llarp_util_thread_pool.cpp
util/test_llarp_utils_scheduler.cpp
util/test_llarp_util_timerqueue.cpp
util/test_llarp_util_traits.cpp
)

View File

@ -4,6 +4,8 @@
#include <util/fs.hpp>
#include <util/types.hpp>
#include <bitset>
namespace llarp
{
namespace test
@ -37,6 +39,55 @@ namespace llarp
}
};
template < typename T >
struct CombinationIterator
{
std::vector< T > toCombine;
std::vector< T > currentCombo;
int bits;
int maxBits;
void
createCombo()
{
currentCombo.clear();
for(size_t i = 0; i < toCombine.size(); ++i)
{
if(bits & (1 << i))
{
currentCombo.push_back(toCombine[i]);
}
}
}
CombinationIterator(const std::vector< T > &values)
: toCombine(values), bits(0), maxBits((1 << values.size()) - 1)
{
currentCombo.reserve(values.size());
createCombo();
}
bool
next()
{
if(bits >= maxBits)
{
return false;
}
++bits;
createCombo();
return true;
}
bool
includesElement(size_t index)
{
return bits & (1 << index);
}
};
} // namespace test
} // namespace llarp

View File

@ -0,0 +1,898 @@
#include <util/metrics_core.hpp>
#include <array>
#include <thread>
#include <test_util.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
using namespace metrics;
using namespace ::testing;
MATCHER(IsValid, "")
{
return arg.valid();
}
static const Category STAT_CAT("A", true);
static const Description desc_A(&STAT_CAT, "A");
static const Description *DESC_A = &desc_A;
static const Description desc_B(&STAT_CAT, "B");
static const Description *DESC_B = &desc_B;
static const Id METRIC_A(DESC_A);
static const Id METRIC_B(DESC_B);
template < typename T >
class CollectorTest : public ::testing::Test
{
};
TYPED_TEST_SUITE_P(CollectorTest);
TYPED_TEST_P(CollectorTest, Collector)
{
TypeParam collector1(METRIC_A);
TypeParam collector2(METRIC_B);
ASSERT_EQ(METRIC_A, collector1.id().description());
ASSERT_EQ(METRIC_B, collector2.id().description());
Record record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(0, record1.count());
ASSERT_EQ(0, record1.total());
ASSERT_EQ(Record::DEFAULT_MAX, record1.max());
ASSERT_EQ(Record::DEFAULT_MIN, record1.min());
Record record2 = collector2.load();
ASSERT_EQ(METRIC_B, record2.id().description());
ASSERT_EQ(0, record2.count());
ASSERT_EQ(0, record2.total());
ASSERT_EQ(Record::DEFAULT_MIN, record2.min());
ASSERT_EQ(Record::DEFAULT_MAX, record2.max());
collector1.tick(1);
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(1, record1.count());
ASSERT_EQ(1, record1.total());
ASSERT_EQ(1, record1.min());
ASSERT_EQ(1, record1.max());
collector1.tick(2);
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(2, record1.count());
ASSERT_EQ(3, record1.total());
ASSERT_EQ(1, record1.min());
ASSERT_EQ(2, record1.max());
collector1.tick(-5);
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(3, record1.count());
ASSERT_EQ(-2, record1.total());
ASSERT_EQ(-5, record1.min());
ASSERT_EQ(2, record1.max());
collector1.clear();
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(0, record1.count());
ASSERT_EQ(0, record1.total());
ASSERT_EQ(Record::DEFAULT_MIN, record1.min());
ASSERT_EQ(Record::DEFAULT_MAX, record1.max());
collector1.tick(3);
record1 = collector1.loadAndClear();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(1, record1.count());
ASSERT_EQ(3, record1.total());
ASSERT_EQ(3, record1.min());
ASSERT_EQ(3, record1.max());
record1 = collector1.load();
ASSERT_EQ(METRIC_A, record1.id().description());
ASSERT_EQ(0, record1.count());
ASSERT_EQ(0, record1.total());
ASSERT_EQ(Record::DEFAULT_MIN, record1.min());
ASSERT_EQ(Record::DEFAULT_MAX, record1.max());
}
REGISTER_TYPED_TEST_SUITE_P(CollectorTest, Collector);
using CollectorTestTypes = ::testing::Types< DoubleCollector, IntCollector >;
INSTANTIATE_TYPED_TEST_SUITE_P(MetricsCore, CollectorTest, CollectorTestTypes);
TEST(MetricsCore, Registry)
{
Registry registry;
Id idA = registry.add("MyCategory", "MetricA");
Id invalidId = registry.add("MyCategory", "MetricA");
ASSERT_THAT(invalidId, Not(IsValid()));
Id idA_copy1 = registry.get("MyCategory", "MetricA");
ASSERT_THAT(idA_copy1, IsValid());
ASSERT_EQ(idA_copy1, idA);
Id idA_copy2 = registry.findId("MyCategory", "MetricA");
ASSERT_THAT(idA_copy2, IsValid());
ASSERT_EQ(idA_copy2, idA);
Id idB = registry.get("MyCategory", "MetricB");
ASSERT_THAT(idB, IsValid());
ASSERT_EQ(idB, registry.get("MyCategory", "MetricB"));
ASSERT_EQ(idB, registry.findId("MyCategory", "MetricB"));
ASSERT_THAT(registry.add("MyCategory", "MetricB"), Not(IsValid()));
const Category *myCategory = registry.get("MyCategory");
ASSERT_EQ(myCategory, idA.category());
ASSERT_EQ(myCategory, idB.category());
ASSERT_TRUE(myCategory->enabled());
registry.enable(myCategory, false);
ASSERT_FALSE(myCategory->enabled());
}
TEST(MetricsCore, RegistryAddr)
{
Registry registry;
const Category *CAT_A = registry.add("A");
const Category *CAT_B = registry.get("B");
Id METRIC_AA = registry.add("A", "A");
Id METRIC_AB = registry.add("A", "B");
Id METRIC_AC = registry.add("A", "C");
Id METRIC_BA = registry.get("B", "A");
Id METRIC_BB = registry.get("B", "B");
Id METRIC_BD = registry.get("B", "D");
const Category *CAT_C = registry.add("C");
const Category *CAT_D = registry.add("D");
Id METRIC_EE = registry.add("E", "E");
Id METRIC_FF = registry.get("F", "F");
ASSERT_EQ(CAT_A->name(), METRIC_AA.metricName());
ASSERT_EQ(CAT_B->name(), METRIC_AB.metricName());
ASSERT_EQ(CAT_A->name(), METRIC_BA.metricName());
ASSERT_EQ(CAT_B->name(), METRIC_BB.metricName());
ASSERT_EQ(CAT_C->name(), METRIC_AC.metricName());
ASSERT_EQ(CAT_D->name(), METRIC_BD.metricName());
ASSERT_EQ(METRIC_EE.metricName(), METRIC_EE.categoryName());
ASSERT_EQ(METRIC_FF.metricName(), METRIC_FF.categoryName());
}
TEST(MetricsCore, RegistryOps)
{
struct
{
const char *d_category;
const char *d_name;
} METRICS[] = {
{
"",
"",
},
{"C0", "M0"},
{"C0", "M1"},
{"C1", "M2"},
{"C3", "M3"},
};
const size_t NUM_METRICS = sizeof METRICS / sizeof *METRICS;
{
std::set< std::string > categoryNames;
Registry registry;
for(size_t i = 0; i < NUM_METRICS; ++i)
{
const char *CATEGORY = METRICS[i].d_category;
const char *NAME = METRICS[i].d_name;
categoryNames.insert(CATEGORY);
// Add a new id and verify the returned properties.
Id id = registry.add(CATEGORY, NAME);
ASSERT_TRUE(id.valid()) << id;
ASSERT_NE(nullptr, id.description());
ASSERT_NE(nullptr, id.category());
ASSERT_STREQ(id.metricName(), NAME);
ASSERT_STREQ(id.categoryName(), CATEGORY);
ASSERT_TRUE(id.category()->enabled());
// Attempt to find the id.
Id foundId = registry.findId(CATEGORY, NAME);
ASSERT_TRUE(foundId.valid());
ASSERT_EQ(foundId, id);
// Attempt to add the id a second time
Id invalidId = registry.add(CATEGORY, NAME);
ASSERT_FALSE(invalidId.valid());
// Attempt to find the category.
const Category *foundCat = registry.findCategory(CATEGORY);
ASSERT_EQ(id.category(), foundCat);
ASSERT_EQ(nullptr, registry.add(CATEGORY));
ASSERT_EQ(i + 1, registry.metricCount());
ASSERT_EQ(categoryNames.size(), registry.categoryCount());
}
ASSERT_EQ(NUM_METRICS, registry.metricCount());
ASSERT_EQ(categoryNames.size(), registry.categoryCount());
const Category *NEW_CAT = registry.add("NewCategory");
ASSERT_NE(nullptr, NEW_CAT);
ASSERT_STREQ("NewCategory", NEW_CAT->name());
ASSERT_TRUE(NEW_CAT->enabled());
}
const char *CATEGORIES[] = {"", "A", "B", "CAT_A", "CAT_B", "name"};
const size_t NUM_CATEGORIES = sizeof CATEGORIES / sizeof *CATEGORIES;
{
Registry registry;
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
const char *CATEGORY = CATEGORIES[i];
const Category *cat = registry.add(CATEGORY);
ASSERT_NE(nullptr, cat);
ASSERT_STREQ(cat->name(), CATEGORY);
ASSERT_TRUE(cat->enabled());
ASSERT_EQ(nullptr, registry.add(CATEGORY));
ASSERT_EQ(cat, registry.findCategory(CATEGORY));
Id id = registry.add(CATEGORY, "Metric");
ASSERT_TRUE(id.valid());
ASSERT_EQ(cat, id.category());
ASSERT_STREQ(id.categoryName(), CATEGORY);
ASSERT_STREQ(id.metricName(), "Metric");
ASSERT_EQ(i + 1, registry.metricCount());
ASSERT_EQ(i + 1, registry.categoryCount());
}
}
}
MATCHER_P6(RecordEq, category, name, count, total, min, max, "")
{
// clang-format off
return (
arg.id().categoryName() == std::string(category) &&
arg.id().metricName() == std::string(name) &&
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
MATCHER_P5(RecordEq, id, count, total, min, max, "")
{
// clang-format off
return (
arg.id() == id &&
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
MATCHER_P4(RecordEq, count, total, min, max, "")
{
// clang-format off
return (
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
MATCHER_P5(RecordCatEq, category, count, total, min, max, "")
{
// clang-format off
return (
arg.id().categoryName() == std::string(category) &&
arg.count() == count &&
arg.total() == total &&
arg.min() == min &&
arg.max() == max
);
// clang-format on
}
TEST(MetricsCore, RepoBasic)
{
Registry registry;
CollectorRepo repo(&registry);
DoubleCollector *collector1 = repo.defaultDoubleCollector("Test", "C1");
DoubleCollector *collector2 = repo.defaultDoubleCollector("Test", "C2");
IntCollector *intCollector1 = repo.defaultIntCollector("Test", "C3");
IntCollector *intCollector2 = repo.defaultIntCollector("Test", "C4");
ASSERT_NE(collector1, collector2);
ASSERT_EQ(collector1, repo.defaultDoubleCollector("Test", "C1"));
ASSERT_NE(intCollector1, intCollector2);
ASSERT_EQ(intCollector1, repo.defaultIntCollector("Test", "C3"));
collector1->tick(1.0);
collector1->tick(2.0);
collector2->tick(4.0);
intCollector1->tick(5);
intCollector2->tick(6);
std::vector< Record > records = repo.collectAndClear(registry.get("Test"));
ASSERT_THAT(records, SizeIs(4));
// clang-format off
ASSERT_THAT(
records,
ElementsAre(
RecordEq("Test", "C1", 2u, 3, 1, 2),
RecordEq("Test", "C2", 1u, 4, 4, 4),
RecordEq("Test", "C3", 1u, 5, 5, 5),
RecordEq("Test", "C4", 1u, 6, 6, 6)
)
);
// clang-format on
for(const auto &rec : records)
{
std::cout << rec << std::endl;
}
}
TEST(MetricsCore, RepoCollect)
{
Registry registry;
std::array< const char *, 3 > CATEGORIES = {"A", "B", "C"};
std::array< const char *, 3 > METRICS = {"A", "B", "C"};
const int NUM_COLS = 3;
for(int i = 0; i < static_cast< int >(CATEGORIES.size()); ++i)
{
CollectorRepo repo(&registry);
for(int j = 0; j < static_cast< int >(CATEGORIES.size()); ++j)
{
const char *CATEGORY = CATEGORIES[j];
for(int k = 0; k < static_cast< int >(METRICS.size()); ++k)
{
Id metric = registry.get(CATEGORY, METRICS[k]);
for(int l = 0; l < NUM_COLS; ++l)
{
DoubleCollector *dCol = repo.addDoubleCollector(metric).get();
IntCollector *iCol = repo.addIntCollector(metric).get();
if(i == j)
{
dCol->set(k, 2 * k, -k, k);
iCol->set(k, 2 * k, -k, k);
}
else
{
dCol->set(100, 100, 100, 100);
iCol->set(100, 100, 100, 100);
}
}
}
}
// Collect records for the metrics we're testing
{
const char *CATEGORY = CATEGORIES[i];
const Category *category = registry.get(CATEGORY);
std::vector< Record > records = repo.collect(category);
ASSERT_THAT(records, SizeIs(static_cast< int >(METRICS.size())));
// clang-format off
ASSERT_THAT(
records,
UnorderedElementsAre(
RecordEq(CATEGORY, "A", 0u, 0, 0, 0),
RecordEq(CATEGORY, "B", 6u, 12, -1, 1),
RecordEq(CATEGORY, "C", 12u, 24, -2, 2)
)
);
// clang-format on
// Validate initial values.
for(int j = 0; j < static_cast< int >(METRICS.size()); ++j)
{
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
const auto &doubleCols = collectors.first;
const auto &intCols = collectors.second;
for(int k = 0; k < static_cast< int >(doubleCols.size()); ++k)
{
Record E(metric, j, 2 * j, -j, j);
Record record1 = doubleCols[k]->load();
Record record2 = intCols[k]->load();
ASSERT_EQ(record1, E);
ASSERT_EQ(record2, E);
}
}
}
// Verify the collectors for other categories haven't changed.
for(int j = 0; j < static_cast< int >(CATEGORIES.size()); ++j)
{
if(i == j)
{
continue;
}
const char *CATEGORY = CATEGORIES[j];
for(int k = 0; k < static_cast< int >(METRICS.size()); ++k)
{
Id metric = registry.get(CATEGORY, METRICS[j]);
auto collectors = repo.allCollectors(metric);
const auto &doubleCols = collectors.first;
const auto &intCols = collectors.second;
for(int l = 0; l < static_cast< int >(doubleCols.size()); ++l)
{
Record record1 = doubleCols[k]->load();
ASSERT_THAT(record1, RecordEq(metric, 100u, 100, 100, 100));
Record record2 = intCols[k]->load();
ASSERT_THAT(record2, RecordEq(metric, 100u, 100, 100, 100));
}
}
}
}
}
MATCHER_P2(WithinWindow, expectedTime, window, "")
{
auto begin = expectedTime - window;
auto end = expectedTime + window;
return (begin < arg && arg < end);
}
const Category *
firstCategory(const SampleGroup &group)
{
EXPECT_THAT(group, Not(IsEmpty()));
const Category *value = group.begin()->id().category();
for(const Record &record : group.records())
{
EXPECT_EQ(value, record.id().category());
}
return value;
}
TEST(MetricsCore, ManagerCollectSample1)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const char *METRICS[] = {"A", "B", "C", "MyMetric", "90123metric"};
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
CollectorRepo &rep = manager.collectorRepo();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j])->tick(1);
}
}
absl::Time start = absl::Now();
std::this_thread::sleep_for(std::chrono::microseconds(100000));
std::vector< Record > records;
Sample sample = manager.collectSample(records, false);
absl::Duration window = absl::Now() - start;
absl::Time now = absl::Now();
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.groupCount());
ASSERT_THAT(sample.sampleTime(), WithinWindow(now, absl::Milliseconds(10)));
for(size_t i = 0; i < sample.groupCount(); ++i)
{
const SampleGroup &group = sample.group(i);
ASSERT_EQ(NUM_METRICS, group.size());
ASSERT_THAT(group.samplePeriod(),
WithinWindow(window, absl::Milliseconds(10)))
<< group;
const char *name = group.records()[0].id().categoryName();
for(const Record &record : group.records())
{
ASSERT_THAT(record, RecordCatEq(name, 1u, 1, 1, 1));
}
}
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
records.clear();
sample = manager.collectSample(records, true);
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, records.size());
ASSERT_EQ(NUM_CATEGORIES * NUM_METRICS, sample.recordCount());
ASSERT_EQ(NUM_CATEGORIES, sample.groupCount());
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
ASSERT_EQ(Record(record.id()), record);
}
}
}
TEST(MetricsCore, ManagerCollectSample2)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const char *METRICS[] = {"A", "B", "C", "MyMetric", "90123metric"};
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
std::vector< const Category * > allCategories;
CollectorRepo &rep = manager.collectorRepo();
Registry &reg = manager.registry();
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *cat = reg.get(CATEGORIES[i]);
ASSERT_NE(nullptr, cat);
allCategories.push_back(cat);
}
test::CombinationIterator< const Category * > combIt{allCategories};
do
{
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
col->clear();
col->tick(1);
}
}
// Test without a reset.
std::vector< const Category * > cats = combIt.currentCombo;
std::vector< Record > records;
Sample sample = manager.collectSample(
records, absl::Span< const Category * >{cats}, false);
ASSERT_EQ(NUM_METRICS * cats.size(), sample.recordCount());
ASSERT_EQ(cats.size(), sample.groupCount());
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
// Verify the correct categories are in the sample (once)
const Category *CATEGORY = allCategories[i];
bool found = false;
for(size_t j = 0; j < sample.groupCount(); ++j)
{
if(CATEGORY == firstCategory(sample.group(j)))
{
found = true;
}
}
ASSERT_EQ(found, combIt.includesElement(i));
}
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
std::vector< Record > records2;
// Test with a reset.
sample = manager.collectSample(records2,
absl::Span< const Category * >{cats}, true);
ASSERT_EQ(NUM_METRICS * cats.size(), sample.recordCount());
ASSERT_EQ(cats.size(), sample.groupCount());
ASSERT_EQ(records, records2);
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
// Verify the correct categories are in the sample
const Category *CATEGORY = allCategories[i];
bool found = false;
for(size_t j = 0; j < sample.groupCount(); ++j)
{
if(CATEGORY == firstCategory(sample.group(j)))
{
found = true;
}
}
ASSERT_EQ(found, combIt.includesElement(i));
}
for(size_t i = 0; i < NUM_CATEGORIES; ++i)
{
for(size_t j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
rep.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
Record record = col->load();
if(combIt.includesElement(i))
{
ASSERT_EQ(Record(record.id()), record);
}
else
{
ASSERT_THAT(record, RecordEq(1u, 1, 1, 1));
}
}
}
} while(combIt.next());
}
struct MockPublisher : public Publisher
{
std::atomic_int invocations;
std::vector< Record > recordBuffer;
std::vector< Record > sortedRecords;
Sample m_sample;
std::set< absl::Duration > times;
void
publish(const Sample &sample) override
{
invocations++;
m_sample.clear();
recordBuffer.clear();
sortedRecords.clear();
times.clear();
m_sample.sampleTime(sample.sampleTime());
if(sample.recordCount() == 0)
{
return;
}
recordBuffer.reserve(sample.recordCount());
for(const auto &s : sample)
{
auto git = s.begin();
ASSERT_NE(git, s.end());
recordBuffer.push_back(*git);
Record *head = &recordBuffer.back();
for(++git; git != s.end(); ++git)
{
recordBuffer.push_back(*git);
}
m_sample.pushGroup(head, s.size(), s.samplePeriod());
times.insert(s.samplePeriod());
}
sortedRecords = recordBuffer;
std::sort(
sortedRecords.begin(), sortedRecords.end(),
[](const auto &lhs, const auto &rhs) { return lhs.id() < rhs.id(); });
}
void
reset()
{
invocations = 0;
m_sample.clear();
recordBuffer.clear();
sortedRecords.clear();
times.clear();
}
int
indexOf(const Id &id)
{
Record searchRecord(id);
auto it = std::lower_bound(
sortedRecords.begin(), sortedRecords.end(), searchRecord,
[](const auto &lhs, const auto &rhs) { return lhs.id() < rhs.id(); });
if(it == sortedRecords.end())
{
return -1;
}
return (it->id() == id) ? it - sortedRecords.begin() : -1;
}
bool
contains(const Id &id)
{
return indexOf(id) != -1;
}
};
TEST(MetricsCore, ManagerAddCatPub)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const int NUM_PUBLISHERS = 4;
std::multimap< const char *, std::shared_ptr< Publisher > > publishers;
Manager manager;
Registry &registry = manager.registry();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_PUBLISHERS; ++j)
{
auto globalPub = std::make_shared< MockPublisher >();
manager.addPublisher(CATEGORIES[i], globalPub);
publishers.emplace(CATEGORIES[i], globalPub);
}
}
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
const char *CATEGORY = CATEGORIES[i];
const Category *CAT = registry.get(CATEGORY);
std::vector< Publisher * > results = manager.publishersForCategory(CAT);
ASSERT_EQ(NUM_PUBLISHERS, results.size());
auto it = publishers.lower_bound(CATEGORY);
for(const auto &pub : results)
{
ASSERT_EQ(pub, it->second.get());
++it;
}
}
}
TEST(MetricsCore, ManagerEnableAll)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
Manager manager;
Registry &registry = manager.registry();
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *CAT = registry.get(CATEGORIES[i]);
ASSERT_TRUE(CAT->enabled());
manager.enableCategory(CAT, false);
ASSERT_FALSE(CAT->enabled());
manager.enableCategory(CAT, true);
ASSERT_TRUE(CAT->enabled());
manager.enableCategory(CATEGORIES[i], false);
ASSERT_FALSE(CAT->enabled());
manager.enableCategory(CATEGORIES[i], true);
ASSERT_TRUE(CAT->enabled());
}
manager.enableAll(false);
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
ASSERT_FALSE(registry.get(CATEGORIES[i])->enabled());
}
manager.enableAll(true);
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
ASSERT_TRUE(registry.get(CATEGORIES[i])->enabled());
}
}
TEST(MetricsCore, PublishAll)
{
const char *CATEGORIES[] = {"A", "B", "C", "Test", "12312category"};
const int NUM_CATEGORIES = sizeof(CATEGORIES) / sizeof(*CATEGORIES);
const char *METRICS[] = {"A", "B", "C", "MyMetric", "903metric"};
const int NUM_METRICS = sizeof(METRICS) / sizeof(*METRICS);
Manager manager;
Registry &registry = manager.registry();
CollectorRepo &repository = manager.collectorRepo();
auto globalPub = std::make_shared< MockPublisher >();
manager.addGlobalPublisher(globalPub);
std::vector< const Category * > allCategories;
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
const Category *CAT = registry.get(CATEGORIES[i]);
auto mockPubCat = std::make_shared< MockPublisher >();
manager.addPublisher(CAT, mockPubCat);
allCategories.push_back(CAT);
}
test::CombinationIterator< const Category * > combIt(allCategories);
do
{
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
DoubleCollector *col =
repository.defaultDoubleCollector(CATEGORIES[i], METRICS[j]);
col->clear();
col->tick(1);
}
}
std::set< const Category * > excludedSet;
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
if(!combIt.includesElement(i))
{
excludedSet.insert(allCategories[i]);
}
}
ASSERT_EQ(allCategories.size(),
excludedSet.size() + combIt.currentCombo.size());
// Publish the records.
absl::Time tmStamp = absl::Now();
manager.publishAllExcluding(excludedSet);
if(combIt.currentCombo.empty())
{
ASSERT_EQ(0, globalPub->invocations.load());
}
else
{
ASSERT_EQ(1, globalPub->invocations.load());
ASSERT_THAT(globalPub->m_sample.sampleTime(),
WithinWindow(tmStamp, absl::Milliseconds(10)));
ASSERT_EQ(combIt.currentCombo.size(), globalPub->m_sample.groupCount());
}
// Verify the correct "specific" publishers have been invoked.
for(int i = 0; i < NUM_CATEGORIES; ++i)
{
for(int j = 0; j < NUM_METRICS; ++j)
{
Id id = registry.get(CATEGORIES[i], METRICS[j]);
ASSERT_EQ(combIt.includesElement(i), globalPub->contains(id));
}
const int EXP_INV = combIt.includesElement(i) ? 1 : 0;
std::vector< Publisher * > pubs =
manager.publishersForCategory(allCategories[i]);
MockPublisher *specPub = (MockPublisher *)pubs.front();
ASSERT_EQ(EXP_INV, specPub->invocations.load());
specPub->reset();
}
globalPub->reset();
} while(combIt.next());
}

View File

@ -0,0 +1,32 @@
#include <util/metrics_publishers.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
TEST(MetricsPublisher, StreamPublisher)
{
metrics::Category myCategory("MyCategory");
metrics::Description descA(&myCategory, "MetricA");
metrics::Description descB(&myCategory, "MetricB");
metrics::Id metricA(&descA);
metrics::Id metricB(&descB);
std::stringstream stream;
metrics::StreamPublisher myPublisher(stream);
std::vector< metrics::Record > records;
records.emplace_back(metricA, 5, 25.0, 6.0, 25.0);
records.emplace_back(metricB, 2, 7.0, 3.0, 11.0);
metrics::Sample sample;
sample.sampleTime(absl::Now());
sample.pushGroup(records.data(), records.size(), absl::Seconds(5));
myPublisher.publish(sample);
std::cout << stream.str();
}

View File

@ -0,0 +1,341 @@
#include <util/metrics_types.hpp>
#include <array>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
using namespace ::testing;
struct MetricFormatSpecTestData
{
float m_scale;
const char *m_spec;
double m_value;
const char *m_expected;
};
struct MetricFormatSpecTest : public TestWithParam< MetricFormatSpecTestData >
{
};
TEST_P(MetricFormatSpecTest, print)
{
auto d = GetParam();
metrics::FormatSpec spec(d.m_scale, d.m_spec);
std::ostringstream stream;
metrics::FormatSpec::format(stream, d.m_value, spec);
ASSERT_EQ(d.m_expected, stream.str());
}
MetricFormatSpecTestData metricFormatTestData[] = {
MetricFormatSpecTestData{0.0, "", 1.5, ""},
MetricFormatSpecTestData{1.0, "%.4f", 1.5, "1.5000"},
MetricFormatSpecTestData{1.0, "%.0f", 2.0, "2"},
MetricFormatSpecTestData{1.0, "%.0f", 1.1, "1"},
MetricFormatSpecTestData{1.0, "%.0f", 1.5, "2"},
MetricFormatSpecTestData{1.0, "%.0f", 1.7, "2"},
MetricFormatSpecTestData{1.0, "%.0f", 3.0, "3"},
MetricFormatSpecTestData{2.0, "%.0f", 3.0, "6"},
MetricFormatSpecTestData{2.0, "%.1f", 1.1, "2.2"}};
INSTANTIATE_TEST_CASE_P(MetricsTypes, MetricFormatSpecTest,
ValuesIn(metricFormatTestData));
TEST(MetricsTypes, Format)
{
metrics::Format format;
format.setSpec(metrics::Publication::Type::Max,
metrics::FormatSpec(1.0, "%0.2f"));
format.setSpec(metrics::Publication::Type::Total,
metrics::FormatSpec(2.0, "%0.3f"));
ASSERT_EQ(nullptr, format.specFor(metrics::Publication::Type::Avg));
auto ptr = format.specFor(metrics::Publication::Type::Total);
ASSERT_NE(nullptr, ptr);
ASSERT_STREQ("%0.3f", ptr->m_format);
ASSERT_DOUBLE_EQ(2.0, ptr->m_scale);
ptr = format.specFor(metrics::Publication::Type::Max);
ASSERT_NE(nullptr, ptr);
ASSERT_STREQ("%0.2f", ptr->m_format);
ASSERT_DOUBLE_EQ(1.0, ptr->m_scale);
format.clear();
ASSERT_EQ(nullptr, format.specFor(metrics::Publication::Type::Total));
ASSERT_EQ(nullptr, format.specFor(metrics::Publication::Type::Max));
}
TEST(MetricsTypes, CatContainer)
{
std::array< metrics::CategoryContainer, 10 > containers;
{
metrics::Category c("A");
for(size_t i = 0; i < containers.size(); ++i)
{
c.registerContainer(&containers[i]);
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_TRUE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
for(size_t i = 0; i < containers.size(); ++i)
{
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_TRUE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
const std::atomic_bool *enabled = &c.enabledRaw();
c.enabled(false);
ASSERT_FALSE(*enabled);
ASSERT_EQ(&c.enabledRaw(), enabled);
for(size_t i = 0; i < containers.size(); ++i)
{
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_FALSE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
c.enabled(true);
ASSERT_TRUE(*enabled);
ASSERT_EQ(&c.enabledRaw(), enabled);
for(size_t i = 0; i < containers.size(); ++i)
{
metrics::CategoryContainer *next = (0 == i) ? 0 : &containers[i - 1];
ASSERT_EQ(&c, containers[i].m_category);
ASSERT_TRUE(containers[i].m_enabled);
ASSERT_EQ(next, containers[i].m_nextCategory);
}
}
for(const auto &container : containers)
{
ASSERT_THAT(container.m_category, IsNull());
ASSERT_FALSE(container.m_enabled);
ASSERT_THAT(container.m_nextCategory, IsNull());
}
}
TEST(MetricsTypes, Record)
{
metrics::Record r;
ASSERT_GT(r.min(), r.max());
}
TEST(MetricsTypes, Sample)
{
metrics::Category myCategory("MyCategory");
metrics::Description descA(&myCategory, "MetricA");
metrics::Description descB(&myCategory, "MetricB");
metrics::Description descC(&myCategory, "MetricC");
metrics::Id metricA(&descA);
metrics::Id metricB(&descB);
metrics::Id metricC(&descC);
absl::Time timeStamp = absl::Now();
metrics::Record recordA(metricA, 0, 0, 0, 0);
metrics::Record recordB(metricB, 1, 2, 3, 4);
metrics::Record recordC(metricC, 4, 3, 2, 1);
metrics::Record buffer1[] = {recordA, recordB};
std::vector< metrics::Record > buffer2;
buffer2.push_back(recordC);
metrics::Sample sample;
sample.sampleTime(timeStamp);
sample.pushGroup(buffer1, sizeof(buffer1) / sizeof(*buffer1),
absl::Seconds(1.0));
sample.pushGroup(buffer2.data(), buffer2.size(), absl::Seconds(2.0));
ASSERT_EQ(timeStamp, sample.sampleTime());
ASSERT_EQ(2u, sample.groupCount());
ASSERT_EQ(3u, sample.recordCount());
ASSERT_EQ(absl::Seconds(1), sample.group(0).samplePeriod());
ASSERT_EQ(buffer1, sample.group(0).records().data());
ASSERT_EQ(2, sample.group(0).size());
ASSERT_EQ(absl::Seconds(2), sample.group(1).samplePeriod());
ASSERT_EQ(buffer2.data(), sample.group(1).records().data());
ASSERT_EQ(1, sample.group(1).size());
for(auto sampleIt = sample.begin(); sampleIt != sample.end(); ++sampleIt)
{
;
for(auto groupIt = sampleIt->begin(); groupIt != sampleIt->end(); ++groupIt)
{
std::cout << *groupIt << std::endl;
}
}
}
struct SampleTest
: public ::testing::TestWithParam< std::pair< absl::Time, std::string > >
{
metrics::Category cat_A;
metrics::Description DESC_A;
metrics::Description DESC_B;
metrics::Description DESC_C;
metrics::Description DESC_D;
metrics::Description DESC_E;
metrics::Description DESC_F;
metrics::Description DESC_G;
metrics::Id id_A;
metrics::Id id_B;
metrics::Id id_C;
metrics::Id id_D;
metrics::Id id_E;
metrics::Id id_F;
metrics::Id id_G;
std::vector< metrics::Record > recordBuffer;
SampleTest()
: cat_A("A", true)
, DESC_A(&cat_A, "A")
, DESC_B(&cat_A, "B")
, DESC_C(&cat_A, "C")
, DESC_D(&cat_A, "D")
, DESC_E(&cat_A, "E")
, DESC_F(&cat_A, "F")
, DESC_G(&cat_A, "G")
, id_A(&DESC_A)
, id_B(&DESC_B)
, id_C(&DESC_C)
, id_D(&DESC_D)
, id_E(&DESC_E)
, id_F(&DESC_F)
, id_G(&DESC_G)
{
recordBuffer.emplace_back(metrics::Id(0), 1, 1, 1, 1);
recordBuffer.emplace_back(id_A, 2, 2, 2, 2);
recordBuffer.emplace_back(id_B, 3, 3, 3, 3);
recordBuffer.emplace_back(id_C, 4, 4, 4, 4);
recordBuffer.emplace_back(id_D, 5, 5, 5, 5);
recordBuffer.emplace_back(id_E, 6, 6, 6, 6);
recordBuffer.emplace_back(id_F, 7, 7, 7, 7);
recordBuffer.emplace_back(id_G, 8, 8, 8, 8);
recordBuffer.emplace_back(id_A, 9, 9, 9, 9);
}
};
std::pair< std::vector< metrics::SampleGroup >, size_t >
generate(const std::string &specification,
const std::vector< metrics::Record > &recordBuffer)
{
const char *c = specification.c_str();
std::vector< metrics::SampleGroup > groups;
size_t size = 0;
const metrics::Record *head = recordBuffer.data();
const metrics::Record *current = head;
while(*c)
{
int numRecords = *(c + 1) - '0';
int elapsedTime = *(c + 3) - '0';
if(head + recordBuffer.size() < current + numRecords)
{
current = head;
}
groups.emplace_back(current, numRecords, absl::Seconds(elapsedTime));
size += numRecords;
current += numRecords;
c += 4;
}
return {groups, size};
}
TEST_P(SampleTest, basics)
{
absl::Time timestamp;
std::string spec;
std::tie(timestamp, spec) = GetParam();
std::vector< metrics::SampleGroup > groups;
size_t size;
std::tie(groups, size) = generate(spec, recordBuffer);
// Create the sample.
metrics::Sample sample;
sample.sampleTime(timestamp);
for(size_t j = 0; j < groups.size(); ++j)
{
sample.pushGroup(groups[j]);
}
// Test the sample.
ASSERT_EQ(timestamp, sample.sampleTime());
ASSERT_EQ(groups.size(), sample.groupCount());
ASSERT_EQ(size, sample.recordCount());
for(size_t j = 0; j < sample.groupCount(); ++j)
{
ASSERT_EQ(groups[j], sample.group(j));
}
}
TEST_P(SampleTest, append)
{
absl::Time timestamp;
std::string spec;
std::tie(timestamp, spec) = GetParam();
std::vector< metrics::SampleGroup > groups;
size_t size;
std::tie(groups, size) = generate(spec, recordBuffer);
// Create the sample.
metrics::Sample sample;
sample.sampleTime(timestamp);
std::for_each(groups.begin(), groups.end(), [&](const auto &group) {
sample.pushGroup(group.records(), group.samplePeriod());
});
// Test the sample.
ASSERT_EQ(timestamp, sample.sampleTime());
ASSERT_EQ(groups.size(), sample.groupCount());
ASSERT_EQ(size, sample.recordCount());
for(size_t j = 0; j < sample.groupCount(); ++j)
{
ASSERT_EQ(groups[j], sample.group(j));
}
}
absl::Time
fromYYMMDD(int year, int month, int day)
{
return absl::FromCivil(absl::CivilDay(year, month, day), absl::UTCTimeZone());
}
std::pair< absl::Time, std::string > sampleTestData[] = {
{fromYYMMDD(1900, 1, 1), ""},
{fromYYMMDD(1999, 1, 1), "R1E1"},
{fromYYMMDD(1999, 2, 1), "R2E2"},
{fromYYMMDD(2001, 9, 9), "R1E1R2E2"},
{fromYYMMDD(2001, 9, 9), "R3E3R3E3"},
{fromYYMMDD(2009, 9, 9), "R2E4R1E1"},
{fromYYMMDD(2001, 9, 9), "R1E1R2E2R3E3"},
{fromYYMMDD(2001, 9, 9), "R4E1R3E2R2E3R1E4"},
{fromYYMMDD(2001, 9, 9), "R1E1R2E2R1E1R2E2R1E1R2E1R1E2"}};
INSTANTIATE_TEST_CASE_P(MetricsTypes, SampleTest,
::testing::ValuesIn(sampleTestData));