Threading structures for metrics

This commit is contained in:
Michael 2019-03-20 23:18:32 +00:00
parent 50559b6471
commit acfff4ca5c
No known key found for this signature in database
GPG Key ID: 2D51757B47E2434C
11 changed files with 2515 additions and 1 deletions

View File

@ -17,9 +17,11 @@ set(LIB_UTIL_SRC
util/logger.cpp
util/logic.cpp
util/mem.cpp
util/object.cpp
util/printer.cpp
util/queue_manager.cpp
util/queue.cpp
util/printer.cpp
util/scheduler.cpp
util/status.cpp
util/str.cpp
util/string_view.cpp
@ -28,6 +30,7 @@ set(LIB_UTIL_SRC
util/threadpool.cpp
util/time.cpp
util/timer.cpp
util/timerqueue.cpp
util/traits.cpp
util/types.cpp
)

1
llarp/util/object.cpp Normal file
View File

@ -0,0 +1 @@
#include <util/object.hpp>

411
llarp/util/object.hpp Normal file
View File

@ -0,0 +1,411 @@
#ifndef LLARP_OBJECT_HPP
#define LLARP_OBJECT_HPP
#include <absl/types/optional.h>
#include <util/threading.hpp>
namespace llarp
{
namespace object
{
/// Provide a buffer, capable of holding a single `Value` object.
/// This is useful for node-based data structures.
/// Note:
/// - This union explicitly does *not* manage the lifetime of the object,
/// explicit calls to the constructor/destructor must be made.
template < typename Value >
union Buffer {
private:
char m_buffer[sizeof(Value)];
char m_align[alignof(Value)];
public:
Value*
address()
{
return reinterpret_cast< Value* >(static_cast< void* >(m_buffer));
}
const Value*
address() const
{
return reinterpret_cast< Value* >(static_cast< void* >(m_buffer));
}
char*
buffer()
{
return m_buffer;
}
const char*
buffer() const
{
return m_buffer;
}
Value&
value()
{
return *reinterpret_cast< Value* >(this);
}
const Value&
value() const
{
return *reinterpret_cast< const Value* >(this);
}
};
template < typename Value >
class Catalog;
template < typename Value >
class CatalogIterator;
template < typename Value >
class CatalogCleaner
{
Catalog< Value >* m_catalog;
typename Catalog< Value >::Node* m_node;
bool m_shouldDelete;
CatalogCleaner(const CatalogCleaner&) = delete;
CatalogCleaner&
operator=(const CatalogCleaner&) = delete;
public:
explicit CatalogCleaner(Catalog< Value >* catalog)
: m_catalog(catalog), m_node(nullptr), m_shouldDelete(false)
{
}
~CatalogCleaner();
void
manageNode(typename Catalog< Value >::Node* node, bool shouldDelete)
{
m_node = node;
m_shouldDelete = shouldDelete;
}
void
releaseNode()
{
m_node = nullptr;
}
void
release()
{
releaseNode();
m_catalog = nullptr;
}
};
/// A pooling catalog of objects, referred to by a 32-bit handle
template < typename Value >
class Catalog
{
static constexpr int32_t INDEX_MASK = 0X007FFFFF;
static constexpr int32_t BUSY_INDICATOR = 0x00800000;
static constexpr int32_t GENERATION_INC = 0x01000000;
static constexpr int32_t GENERATION_MASK = 0XFF000000;
struct Node
{
typedef union {
Buffer< Value > m_buffer;
Node* m_next;
} Payload;
Payload m_payload;
int32_t m_handle;
};
std::vector< Node* > m_nodes;
Node* m_next;
std::atomic_size_t m_size;
mutable util::Mutex m_mutex;
friend class CatalogCleaner< Value >;
friend class CatalogIterator< Value >;
static Value*
getValue(Node* node)
{
return node->m_payload.m_buffer.address();
}
void
freeNode(Node* node)
{
node->m_handle += GENERATION_INC;
node->m_handle &= ~BUSY_INDICATOR;
node->m_payload.m_next = m_next;
m_next = node;
}
Node*
findNode(int32_t handle) const
{
int32_t index = handle & INDEX_MASK;
if(0 > index || index >= (int32_t)m_nodes.size()
|| !(handle & BUSY_INDICATOR))
{
return nullptr;
}
Node* node = m_nodes[index];
return (node->m_handle == handle) ? node : nullptr;
}
public:
Catalog() : m_next(nullptr), m_size(0)
{
}
~Catalog()
{
removeAll();
}
int32_t
add(const Value& value)
{
int32_t handle;
absl::WriterMutexLock l(&m_mutex);
CatalogCleaner< Value > guard(this);
Node* node;
if(m_next)
{
node = m_next;
m_next = node->m_payload.m_next;
guard.manageNode(node, false);
}
else
{
assert(m_nodes.size() < BUSY_INDICATOR);
node = static_cast< Node* >(operator new(sizeof(Node)));
guard.manageNode(node, true);
m_nodes.push_back(node);
node->m_handle = static_cast< int32_t >(m_nodes.size()) - 1;
guard.manageNode(node, false);
}
node->m_handle |= BUSY_INDICATOR;
handle = node->m_handle;
// construct into the node.
::new(getValue(node)) Value(value);
guard.release();
++m_size;
return handle;
}
bool
remove(int32_t handle, Value* value = nullptr)
{
absl::WriterMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return false;
}
Value* val = getValue(node);
if(value)
{
*value = *val;
}
val->~Value();
freeNode(node);
--m_size;
return true;
}
void
removeAll(std::vector< Value >* output = nullptr)
{
absl::WriterMutexLock l(&m_mutex);
for(Node* node : m_nodes)
{
if(node->m_handle & BUSY_INDICATOR)
{
Value* value = getValue(node);
if(output)
{
output->push_back(*value);
}
value->~Value();
}
delete node;
}
m_nodes.clear();
m_next = nullptr;
m_size = 0;
}
bool
replace(const Value& newValue, int32_t handle)
{
absl::WriterMutexLock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return false;
}
Value* value = getValue(node);
value->~Value();
// construct into the node.
::new(value) Value(newValue);
return true;
}
absl::optional< Value >
find(int32_t handle)
{
util::Lock l(&m_mutex);
Node* node = findNode(handle);
if(!node)
{
return {};
}
return *getValue(node);
}
size_t
size() const
{
return m_size;
}
/// introduced for testing only. verify the current state of the catalog.
bool
verify() const;
};
template < typename Value >
class CatalogIterator
{
const Catalog< Value >* m_catalog;
size_t m_index;
public:
CatalogIterator(const Catalog< Value >& catalog)
: m_catalog(&catalog), m_index(-1)
{
m_catalog->m_mutex.ReaderLock();
operator++();
}
~CatalogIterator()
{
m_catalog->m_mutex.ReaderUnlock();
}
void
operator++()
{
m_index++;
while(m_index < m_catalog->m_nodes.size()
&& !(m_catalog->m_nodes[m_index]->m_handle
& Catalog< Value >::BUSY_INDICATOR))
{
++m_index;
}
}
explicit operator bool() const
{
return m_index < m_catalog->m_nodes.size();
}
std::pair< int32_t, Value >
operator()() const
{
auto* node = m_catalog->m_nodes[m_index];
return {node->m_handle, *Catalog< Value >::getValue(node)};
}
};
template < typename Value >
CatalogCleaner< Value >::~CatalogCleaner()
{
if(m_catalog && m_node)
{
if(m_shouldDelete)
{
// We call the destructor elsewhere.
operator delete(m_node);
}
else
{
m_catalog->freeNode(m_node);
}
}
}
template < typename Value >
bool
Catalog< Value >::verify() const
{
absl::WriterMutexLock l(&m_mutex);
if(m_nodes.size() < m_size)
{
return false;
}
size_t busyCount = 0;
for(size_t i = 0; i < m_nodes.size(); i++)
{
if((m_nodes[i]->m_handle & INDEX_MASK) != i)
{
return false;
}
if(m_nodes[i]->m_handle & BUSY_INDICATOR)
{
busyCount++;
}
}
if(m_size != busyCount)
{
return false;
}
size_t freeCount = 0;
for(Node* p = m_next; p != nullptr; p = p->m_payload.m_next)
{
freeCount++;
}
if(freeCount + busyCount != m_nodes.size())
{
return false;
}
return true;
}
} // namespace object
} // namespace llarp
#endif

424
llarp/util/scheduler.cpp Normal file
View File

@ -0,0 +1,424 @@
#include <util/scheduler.hpp>
namespace llarp
{
namespace thread
{
const Scheduler::Handle Scheduler::INVALID_HANDLE = -1;
void
Scheduler::dispatch()
{
using PendingRepeatItem = TimerQueueItem< RepeatDataPtr >;
std::vector< PendingRepeatItem > pendingRepeats;
while(true)
{
{
util::Lock l(&m_mutex);
if(!m_running.load(std::memory_order_relaxed))
{
return;
}
m_iterationCount++;
size_t newRepeatSize = 0, newEventSize = 0;
absl::Time now = m_clock();
static constexpr size_t MAX_PENDING_REPEAT = 64;
static constexpr size_t MAX_PENDING_EVENTS = 64;
absl::Time minRepeat, minEvent;
m_repeatQueue.popLess(now, MAX_PENDING_REPEAT, &pendingRepeats,
&newRepeatSize, &minRepeat);
m_eventQueue.popLess(now, MAX_PENDING_EVENTS, &m_events,
&newEventSize, &minEvent);
// If there are no pending events to process...
if(pendingRepeats.empty() && m_events.empty())
{
// if there are none in the queue *at all* block until woken
if(newRepeatSize == 0 && newEventSize == 0)
{
m_condition.Wait(&m_mutex);
}
else
{
absl::Time minTime;
if(newRepeatSize == 0)
{
minTime = minEvent;
}
else if(newEventSize == 0)
{
minTime = minRepeat;
}
else
{
minTime = std::min(minRepeat, minEvent);
}
m_condition.WaitWithDeadline(&m_mutex, minTime);
}
continue;
}
}
auto repeatIt = pendingRepeats.begin();
m_eventIt = m_events.begin();
while(repeatIt != pendingRepeats.end() && m_eventIt != m_events.end())
{
auto repeatTime = repeatIt->time();
auto eventTime = m_eventIt->time();
if(repeatTime < eventTime)
{
auto data = repeatIt->value();
if(!data->m_isCancelled)
{
m_dispatcher(data->m_callback);
if(!data->m_isCancelled)
{
data->m_handle =
m_repeatQueue.add(repeatTime + data->m_period, data);
}
}
repeatIt++;
}
else
{
m_eventCount--;
m_dispatcher(m_eventIt->value());
m_eventIt++;
}
}
// We've eaten one of the queues.
while(repeatIt != pendingRepeats.end())
{
auto repeatTime = repeatIt->time();
auto data = repeatIt->value();
if(!data->m_isCancelled)
{
m_dispatcher(data->m_callback);
if(!data->m_isCancelled)
{
data->m_handle =
m_repeatQueue.add(repeatTime + data->m_period, data);
}
}
repeatIt++;
}
while(m_eventIt != m_events.end())
{
m_eventCount--;
m_dispatcher(m_eventIt->value());
m_eventIt++;
}
pendingRepeats.clear();
m_events.clear();
}
}
void
Scheduler::yield()
{
if(m_running.load(std::memory_order_relaxed))
{
if(std::this_thread::get_id() != m_thread.get_id())
{
size_t iterations = m_iterationCount.load(std::memory_order_relaxed);
while(iterations == m_iterationCount.load(std::memory_order_relaxed)
&& m_running.load(std::memory_order_relaxed))
{
m_condition.Signal();
std::this_thread::yield();
}
}
}
}
Scheduler::Scheduler(const EventDispatcher& dispatcher, const Clock& clock)
: m_clock(clock)
, m_dispatcher(dispatcher)
, m_running(false)
, m_iterationCount(0)
, m_eventIt()
, m_repeatCount(0)
, m_eventCount(0)
{
}
Scheduler::~Scheduler()
{
stop();
}
bool
Scheduler::start()
{
util::Lock threadLock(&m_threadMutex);
util::Lock lock(&m_mutex);
if(m_running.load(std::memory_order_relaxed))
{
return true;
}
m_thread = std::thread(&Scheduler::dispatch, this);
m_running = true;
return true;
}
void
Scheduler::stop()
{
util::Lock threadLock(&m_threadMutex);
// Can't join holding the lock. <_<
{
util::Lock lock(&m_mutex);
if(!m_running.load(std::memory_order_relaxed))
{
return;
}
m_running = false;
m_condition.Signal();
}
m_thread.join();
}
Scheduler::Handle
Scheduler::schedule(absl::Time time,
const std::function< void() >& callback,
const EventKey& key)
{
Handle handle;
{
util::Lock lock(&m_mutex);
bool isAtHead = false;
handle = m_eventQueue.add(time, callback, key, &isAtHead);
if(handle == -1)
{
return INVALID_HANDLE;
}
m_eventCount++;
// If we have an event at the top of the queue, wake the dispatcher.
if(isAtHead)
{
m_condition.Signal();
}
}
return handle;
}
bool
Scheduler::reschedule(Handle handle, absl::Time time, bool wait)
{
bool result = false;
{
util::Lock lock(&m_mutex);
bool isAtHead = false;
result = m_eventQueue.update(handle, time, &isAtHead);
if(isAtHead)
{
m_condition.Signal();
}
}
if(result && wait)
{
yield();
}
return result;
}
bool
Scheduler::reschedule(Handle handle, const EventKey& key, absl::Time time,
bool wait)
{
bool result = false;
{
util::Lock lock(&m_mutex);
bool isAtHead = false;
result = m_eventQueue.update(handle, key, time, &isAtHead);
if(isAtHead)
{
m_condition.Signal();
}
}
if(result && wait)
{
yield();
}
return result;
}
bool
Scheduler::cancel(Handle handle, const EventKey& key, bool wait)
{
if(m_eventQueue.remove(handle, key))
{
m_eventCount--;
return true;
}
// Optimise for the dispatcher thread cancelling a pending event.
// On the dispatch thread, so we don't have to lock.
if(std::this_thread::get_id() == m_thread.get_id())
{
for(auto it = m_events.begin() + m_eventCount; it != m_events.end();
++it)
{
if(it->handle() == handle && it->key() == key)
{
m_eventCount--;
m_events.erase(it);
return true;
}
}
// We didn't find it.
return false;
}
if(handle != INVALID_HANDLE && wait)
{
yield();
}
return false;
}
void
Scheduler::cancelAll(bool wait)
{
std::vector< EventItem > events;
m_eventQueue.removeAll(&events);
m_eventCount -= events.size();
if(wait)
{
yield();
}
}
Scheduler::Handle
Scheduler::scheduleRepeat(absl::Duration interval,
const std::function< void() >& callback,
absl::Time startTime)
{
// Assert that we're not giving an empty duration
assert(interval != absl::Duration());
if(startTime == absl::Time())
{
startTime = interval + m_clock();
}
auto repeatData = std::make_shared< RepeatData >(callback, interval);
{
util::Lock l(&m_mutex);
bool isAtHead = false;
repeatData->m_handle =
m_repeatQueue.add(startTime, repeatData, &isAtHead);
if(repeatData->m_handle == -1)
{
return INVALID_HANDLE;
}
m_repeatCount++;
if(isAtHead)
{
m_condition.Signal();
}
}
return m_repeats.add(repeatData);
}
bool
Scheduler::cancelRepeat(Handle handle, bool wait)
{
RepeatDataPtr data;
if(!m_repeats.remove(handle, &data))
{
return false;
}
m_repeatCount--;
if(!m_repeatQueue.remove(data->m_handle))
{
data->m_isCancelled = true;
if(wait)
{
yield();
}
}
return true;
}
void
Scheduler::cancelAllRepeats(bool wait)
{
std::vector< RepeatDataPtr > repeats;
m_repeats.removeAll(&repeats);
m_repeatCount -= m_repeats.size();
for(auto& repeat : repeats)
{
repeat->m_isCancelled = true;
}
// if we fail to remove something, we *may* have a pending repeat event in
// the dispatcher
bool somethingFailed = false;
for(auto& repeat : repeats)
{
if(!m_repeatQueue.remove(repeat->m_handle))
{
somethingFailed = true;
}
}
if(wait && somethingFailed)
{
yield();
}
}
} // namespace thread
} // namespace llarp

230
llarp/util/scheduler.hpp Normal file
View File

@ -0,0 +1,230 @@
#ifndef LLARP_SCHEDULER_HPP
#define LLARP_SCHEDULER_HPP
#include <util/object.hpp>
#include <util/timerqueue.hpp>
#include <absl/time/time.h>
#include <atomic>
#include <functional>
#include <thread>
#include <vector>
namespace llarp
{
namespace thread
{
/// This is a general purpose event scheduler, supporting both one-off and
/// repeated events.
///
/// Notes:
/// - Events should not be started before their begin time
/// - Events may start an arbitrary amount of time after they are scheduled,
/// if there is a previous long running event.
class Scheduler
{
public:
using Callback = std::function< void() >;
using Handle = int;
static const Handle INVALID_HANDLE;
// Define our own clock so we can test easier
using Clock = std::function< absl::Time() >;
private:
/// struct for repeated events
struct RepeatData
{
Callback m_callback;
absl::Duration m_period;
std::atomic_bool m_isCancelled;
Handle m_handle;
RepeatData(const Callback& callback, absl::Duration period)
: m_callback(callback)
, m_period(period)
, m_isCancelled(false)
, m_handle(0)
{
}
};
using RepeatDataPtr = std::shared_ptr< RepeatData >;
using RepeatQueue = TimerQueue< RepeatDataPtr >;
// Just for naming purposes.
using Event = Callback;
using EventQueue = TimerQueue< Event >;
using EventItem = TimerQueueItem< Event >;
public:
// Looks more horrible than it is.
using EventDispatcher = std::function< void(const Callback&) >;
using EventKey = EventQueue::Key;
private:
Clock m_clock;
EventQueue m_eventQueue;
RepeatQueue m_repeatQueue;
object::Catalog< RepeatDataPtr > m_repeats;
util::Mutex m_threadMutex ACQUIRED_BEFORE(m_mutex); // protects running
util::Mutex m_mutex ACQUIRED_AFTER(m_threadMutex); // master mutex
absl::CondVar m_condition;
EventDispatcher m_dispatcher;
std::thread m_thread;
std::atomic_bool m_running;
std::atomic_size_t m_iterationCount;
std::vector< EventItem > m_events;
std::vector< EventItem >::iterator m_eventIt;
std::atomic_size_t m_repeatCount;
std::atomic_size_t m_eventCount;
Scheduler(const Scheduler&) = delete;
Scheduler&
operator=(const Scheduler&) = delete;
friend class DispatcherImpl;
friend class Tardis;
/// Dispatch thread function
void
dispatch();
/// Yield to the dispatch thread
void
yield();
public:
/// Return the epoch from which to create `Durations` from.
static absl::Time
epoch()
{
return absl::UnixEpoch();
}
static EventDispatcher
defaultDispatcher()
{
return [](const Callback& callback) { callback(); };
}
static Clock
defaultClock()
{
return &absl::Now;
}
Scheduler() : Scheduler(defaultDispatcher(), defaultClock())
{
}
explicit Scheduler(const EventDispatcher& dispatcher)
: Scheduler(dispatcher, defaultClock())
{
}
explicit Scheduler(const Clock& clock)
: Scheduler(defaultDispatcher(), clock)
{
}
Scheduler(const EventDispatcher& dispatcher, const Clock& clock);
~Scheduler();
/// Start the scheduler
/// Note that currently this "can't fail" and return `false`. If thread
/// spawning fails, an exception will be thrown.
bool
start();
void
stop();
Handle
schedule(absl::Time time, const Callback& callback,
const EventKey& key = EventKey(nullptr));
bool
reschedule(Handle handle, absl::Time time, bool wait = false);
bool
reschedule(Handle handle, const EventKey& key, absl::Time time,
bool wait = false);
bool
cancel(Handle handle, bool wait = false)
{
return cancel(handle, EventKey(nullptr), wait);
}
bool
cancel(Handle handle, const EventKey& key, bool wait = false);
void
cancelAll(bool wait = false);
Handle
scheduleRepeat(absl::Duration interval, const Callback& callback,
absl::Time startTime = absl::Time());
bool
cancelRepeat(Handle handle, bool wait = false);
void
cancelAllRepeats(bool wait = false);
size_t
repeatCount() const
{
return m_repeatCount;
}
size_t
eventCount() const
{
return m_eventCount;
}
};
class Tardis
{
mutable util::Mutex m_mutex;
absl::Time m_time;
Scheduler& m_scheduler;
public:
Tardis(Scheduler& scheduler) : m_time(absl::Now()), m_scheduler(scheduler)
{
m_scheduler.m_clock = std::bind(&Tardis::now, this);
}
void
advanceTime(absl::Duration duration)
{
{
absl::WriterMutexLock l(&m_mutex);
m_time += duration;
}
{
absl::WriterMutexLock l(&m_scheduler.m_mutex);
m_scheduler.m_condition.Signal();
}
}
absl::Time
now() const
{
absl::ReaderMutexLock l(&m_mutex);
return m_time;
}
};
} // namespace thread
} // namespace llarp
#endif

View File

@ -0,0 +1 @@
#include <util/timerqueue.hpp>

748
llarp/util/timerqueue.hpp Normal file
View File

@ -0,0 +1,748 @@
#ifndef LLARP_UTIL_TIMERQUEUE_HPP
#define LLARP_UTIL_TIMERQUEUE_HPP
#include <util/object.hpp>
#include <util/threading.hpp>
#include <atomic>
#include <absl/time/time.h>
#include <absl/types/optional.h>
#include <map>
namespace llarp
{
namespace thread
{
template < typename Value >
class TimerQueueItem;
template < typename Value >
class TimerQueue
{
static constexpr int INDEX_BITS_MIN = 8;
static constexpr int INDEX_BITS_MAX = 24;
static constexpr int INDEX_BITS_DEFAULT = 17;
public:
using Handle = int;
static constexpr Handle INVALID_HANDLE = -1;
class Key
{
const void* m_key;
public:
explicit Key(const void* key) : m_key(key)
{
}
explicit Key(int value) : m_key(reinterpret_cast< const void* >(value))
{
}
bool
operator==(const Key& other) const
{
return m_key == other.m_key;
}
bool
operator!=(const Key& other) const
{
return m_key != other.m_key;
}
};
private:
struct Node
{
int m_index;
absl::Time m_time;
Key m_key;
Node* m_prev;
Node* m_next;
object::Buffer< Value > m_value;
Node()
: m_index(0)
, m_time()
, m_key(nullptr)
, m_prev(nullptr)
, m_next(nullptr)
, m_value()
{
}
explicit Node(const absl::Time& time)
: m_index(0)
, m_time(time)
, m_key(nullptr)
, m_prev(nullptr)
, m_next(nullptr)
, m_value()
{
}
};
using NodeMap = std::map< absl::Time, Node* >;
using MapIterator = typename NodeMap::iterator;
const int m_indexMask;
const int m_indexIterationMask;
const int m_indexIterationInc;
mutable util::Mutex m_mutex;
std::vector< Node* > m_nodes GUARDED_BY(m_mutex);
std::atomic< Node* > m_nextNode;
NodeMap m_nodeMap GUARDED_BY(m_mutex);
std::atomic_size_t m_size;
void
freeNode(Node* node)
{
node->m_index =
((node->m_index + m_indexIterationInc) & m_indexIterationMask)
| (node->m_index & m_indexMask);
if(!(node->m_index & m_indexIterationMask))
{
node->m_index += m_indexIterationInc;
}
node->m_prev = nullptr;
}
void
putFreeNode(Node* node)
{
// destroy in place
node->m_value.value().~Value();
Node* nextFreeNode = m_nextNode;
node->m_next = nextFreeNode;
while(!m_nextNode.compare_exchange_strong(nextFreeNode, node))
{
nextFreeNode = m_nextNode;
node->m_next = nextFreeNode;
}
}
void
putFreeNodeList(Node* node)
{
if(node)
{
node->m_value.value().~Value();
Node* end = node;
while(end->m_next)
{
end = end->m_next;
end->m_value.value().~Value();
}
Node* nextFreeNode = m_nextNode;
end->m_next = nextFreeNode;
while(!m_nextNode.compare_exchange_strong(nextFreeNode, node))
{
nextFreeNode = m_nextNode;
end->m_next = nextFreeNode;
}
}
}
TimerQueue(const TimerQueue&) = delete;
TimerQueue&
operator=(const TimerQueue&) = delete;
public:
TimerQueue()
: m_indexMask((1 << INDEX_BITS_DEFAULT) - 1)
, m_indexIterationMask(~m_indexMask)
, m_indexIterationInc(m_indexMask + 1)
, m_nextNode(nullptr)
, m_size(0)
{
}
explicit TimerQueue(int indexBits)
: m_indexMask((1 << indexBits) - 1)
, m_indexIterationMask(~m_indexMask)
, m_indexIterationInc(m_indexMask + 1)
, m_nextNode(nullptr)
, m_size(0)
{
assert(INDEX_BITS_MIN <= indexBits && indexBits <= INDEX_BITS_MAX);
}
~TimerQueue()
{
removeAll();
for(Node* node : m_nodes)
{
delete node;
}
}
/// Add a new `value` to the queue, scheduled for `time`. If not null:
/// - set `isAtHead` to true if the new item is at the front of the
/// queue (eg the item with the lowest `time` value).
/// - set `newSize` to be the length of the new queue.
Handle
add(absl::Time time, const Value& value, bool* isAtHead = nullptr,
size_t* newSize = nullptr)
{
return add(time, value, Key(nullptr), isAtHead, newSize);
}
Handle
add(absl::Time time, const Value& value, const Key& key,
bool* isAtHead = nullptr, size_t* newSize = nullptr);
Handle
add(const TimerQueueItem< Value >& value, bool* isAtHead = nullptr,
size_t* newSize = nullptr);
/// Pop the front of the queue into `item` (if not null).
bool
popFront(TimerQueueItem< Value >* item = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr);
/// Append all records which are less than *or* equal to `time`.
void
popLess(absl::Time time,
std::vector< TimerQueueItem< Value > >* items = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr);
void
popLess(absl::Time time, size_t maxItems,
std::vector< TimerQueueItem< Value > >* items = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr);
bool
remove(Handle handle, TimerQueueItem< Value >* item = nullptr,
size_t* newSize = nullptr, absl::Time* newMinTime = nullptr)
{
return remove(handle, Key(nullptr), item, newSize, newMinTime);
}
bool
remove(Handle handle, const Key& key,
TimerQueueItem< Value >* item = nullptr, size_t* newSize = nullptr,
absl::Time* newMinTime = nullptr);
void
removeAll(std::vector< TimerQueueItem< Value > >* items = nullptr);
/// Update the `time` for the item referred to by the handle
bool
update(Handle handle, absl::Time time, bool* isNewTop = nullptr)
{
return update(handle, Key(nullptr), time, isNewTop);
}
bool
update(Handle handle, const Key& key, absl::Time time,
bool* isNewTop = nullptr);
size_t
size() const
{
return m_size;
}
bool
isValid(Handle handle) const
{
return isValid(handle, Key(nullptr));
}
bool
isValid(Handle handle, const Key& key) const
{
absl::ReaderMutexLock lock(&m_mutex);
int index = (handle & m_indexMask) - 1;
if(0 > index || index >= static_cast< int >(m_nodes.size()))
{
return false;
}
Node* node = m_nodes[index];
if(node->m_index != handle || node->m_key != key)
{
return false;
}
return true;
}
absl::optional< absl::Time >
nextTime() const
{
absl::ReaderMutexLock lock(&m_mutex);
if(m_nodeMap.empty())
{
return {};
}
return m_nodeMap.begin()->first;
}
};
template < typename Value >
class TimerQueueItem
{
public:
using Handle = typename TimerQueue< Value >::Handle;
using Key = typename TimerQueue< Value >::Key;
private:
absl::Time m_time;
Value m_value;
Handle m_handle;
Key m_key;
public:
TimerQueueItem() : m_time(), m_value(), m_handle(0), m_key(nullptr)
{
}
TimerQueueItem(absl::Time time, const Value& value, Handle handle)
: m_time(time), m_value(value), m_handle(handle), m_key(nullptr)
{
}
TimerQueueItem(absl::Time time, const Value& value, Handle handle,
const Key& key)
: m_time(time), m_value(value), m_handle(handle), m_key(key)
{
}
// clang-format off
absl::Time& time() { return m_time; }
absl::Time time() const { return m_time; }
Value& value() { return m_value; }
const Value& value() const { return m_value; }
Handle& handle() { return m_handle; }
Handle handle() const { return m_handle; }
Key& key() { return m_key; }
const Key& key() const { return m_key; }
// clang-format on
};
template < typename Value >
typename TimerQueue< Value >::Handle
TimerQueue< Value >::add(absl::Time time, const Value& value,
const Key& key, bool* isAtHead, size_t* newSize)
{
absl::WriterMutexLock lock(&m_mutex);
Node* node;
if(m_nextNode)
{
// Even though we lock, other threads might be freeing nodes
node = m_nextNode;
Node* next = node->m_next;
while(!m_nextNode.compare_exchange_strong(node, next))
{
node = m_nextNode;
next = node->m_next;
}
}
else
{
// The number of nodes cannot grow to a size larger than the range of
// available indices.
if((int)m_nodes.size() >= m_indexMask - 1)
{
return INVALID_HANDLE;
}
node = new Node;
m_nodes.push_back(node);
node->m_index =
static_cast< int >(m_nodes.size()) | m_indexIterationInc;
}
node->m_time = time;
node->m_key = key;
new(node->m_value.buffer()) Value(value);
{
auto it = m_nodeMap.find(time);
if(m_nodeMap.end() == it)
{
node->m_prev = node;
node->m_next = node;
m_nodeMap[time] = node;
}
else
{
node->m_prev = it->second->m_prev;
it->second->m_prev->m_next = node;
node->m_next = it->second;
it->second->m_prev = node;
}
}
++m_size;
if(isAtHead)
{
*isAtHead = m_nodeMap.begin()->second == node && node->m_prev == node;
}
if(newSize)
{
*newSize = m_size;
}
assert(-1 != node->m_index);
return node->m_index;
}
template < typename Value >
typename TimerQueue< Value >::Handle
TimerQueue< Value >::add(const TimerQueueItem< Value >& value,
bool* isAtHead, size_t* newSize)
{
return add(value.time(), value.value(), value.key(), isAtHead, newSize);
}
template < typename Value >
bool
TimerQueue< Value >::popFront(TimerQueueItem< Value >* item,
size_t* newSize, absl::Time* newMinTime)
{
Node* node = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
if(m_nodeMap.end() == it)
{
return false;
}
node = it->second;
if(item)
{
item->time() = node->m_time;
item->value() = node->m_value.value();
item->handle() = node->m_index;
item->key() = node->m_key;
}
if(node->m_next != node)
{
node->m_prev->m_next = node->m_next;
node->m_next->m_prev = node->m_prev;
if(it->second == node)
{
it->second = node->m_next;
}
}
else
{
m_nodeMap.erase(it);
}
freeNode(node);
--m_size;
if(m_size && newMinTime && !m_nodeMap.empty())
{
*newMinTime = m_nodeMap.begin()->first;
}
if(newSize)
{
*newSize = m_size;
}
}
putFreeNode(node);
return true;
}
template < typename Value >
void
TimerQueue< Value >::popLess(absl::Time time,
std::vector< TimerQueueItem< Value > >* items,
size_t* newSize, absl::Time* newMinTime)
{
Node* begin = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
while(m_nodeMap.end() != it && it->first <= time)
{
Node* const first = it->second;
Node* const last = first->m_prev;
Node* node = first;
do
{
if(items)
{
items->emplace_back(it->first, node->m_value.value(),
node->m_index, node->m_key);
}
freeNode(node);
node = node->m_next;
--m_size;
} while(node != first);
last->m_next = begin;
begin = first;
auto condemned = it;
++it;
m_nodeMap.erase(condemned);
}
if(newSize)
{
*newSize = m_size;
}
if(m_nodeMap.end() != it && newMinTime)
{
*newMinTime = it->first;
}
}
putFreeNodeList(begin);
}
template < typename Value >
void
TimerQueue< Value >::popLess(absl::Time time, size_t maxItems,
std::vector< TimerQueueItem< Value > >* items,
size_t* newSize, absl::Time* newMinTime)
{
Node* begin = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
while(m_nodeMap.end() != it && it->first <= time && 0 < maxItems)
{
Node* const first = it->second;
Node* const last = first->m_prev;
Node* node = first;
Node* prevNode = first->m_prev;
do
{
if(items)
{
items->emplace_back(it->first, node->m_value.value(),
node->m_index, node->m_key);
}
freeNode(node);
prevNode = node;
node = node->m_next;
--m_size;
--maxItems;
} while(0 < maxItems && node != first);
prevNode->m_next = begin;
begin = first;
if(node == first)
{
auto condemned = it;
++it;
m_nodeMap.erase(condemned);
}
else
{
node->m_prev = last;
last->m_next = node;
it->second = node;
break;
}
}
if(newSize)
{
*newSize = m_size;
}
if(m_nodeMap.end() != it && newMinTime)
{
*newMinTime = it->first;
}
}
putFreeNodeList(begin);
}
template < typename Value >
bool
TimerQueue< Value >::remove(Handle handle, const Key& key,
TimerQueueItem< Value >* item, size_t* newSize,
absl::Time* newMinTime)
{
Node* node = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
int index = (handle & m_indexMask) - 1;
if(index < 0 || index >= (int)m_nodes.size())
{
return false;
}
node = m_nodes[index];
if(node->m_index != (int)handle || node->m_key != key
|| nullptr == node->m_prev)
{
return false;
}
if(item)
{
item->time() = node->m_time;
item->value() = node->m_value.value();
item->handle() = node->m_index;
item->key() = node->m_key;
}
if(node->m_next != node)
{
node->m_prev->m_next = node->m_next;
node->m_next->m_prev = node->m_prev;
auto it = m_nodeMap.find(node->m_time);
if(it->second == node)
{
it->second = node->m_next;
}
}
else
{
m_nodeMap.erase(node->m_time);
}
freeNode(node);
--m_size;
if(newSize)
{
*newSize = m_size;
}
if(m_size && newMinTime)
{
assert(!m_nodeMap.empty());
*newMinTime = m_nodeMap.begin()->first;
}
}
putFreeNode(node);
return true;
}
template < typename Value >
void
TimerQueue< Value >::removeAll(
std::vector< TimerQueueItem< Value > >* items)
{
Node* begin = nullptr;
{
absl::WriterMutexLock lock(&m_mutex);
auto it = m_nodeMap.begin();
while(m_nodeMap.end() != it)
{
Node* const first = it->second;
Node* const last = first->m_prev;
Node* node = first;
do
{
if(items)
{
items->emplace_back(it->first, node->m_value.value(),
node->m_index, node->m_key);
}
freeNode(node);
node = node->m_next;
--m_size;
} while(node != first);
last->m_next = begin;
begin = first;
auto condemned = it;
++it;
m_nodeMap.erase(condemned);
}
}
putFreeNodeList(begin);
}
template < typename Value >
bool
TimerQueue< Value >::update(Handle handle, const Key& key, absl::Time time,
bool* isNewTop)
{
absl::WriterMutexLock lock(&m_mutex);
int index = (handle & m_indexMask) - 1;
if(index < 0 || index >= (int)m_nodes.size())
{
return false;
}
Node* node = m_nodes[index];
if(node->m_index != handle || node->m_key != key)
{
return false;
}
if(node->m_prev != node)
{
node->m_prev->m_next = node->m_next;
node->m_next->m_prev = node->m_prev;
auto it = m_nodeMap.find(node->m_time);
if(it->second == node)
{
it->second = node->m_next;
}
}
else
{
m_nodeMap.erase(node->m_time);
}
node->m_time = time;
auto it = m_nodeMap.find(time);
if(m_nodeMap.end() == it)
{
node->m_prev = node;
node->m_next = node;
m_nodeMap[time] = node;
}
else
{
node->m_prev = it->second->m_prev;
it->second->m_prev->m_next = node;
node->m_next = it->second;
it->second->m_prev = node;
}
if(isNewTop)
{
*isNewTop = m_nodeMap.begin()->second == node && node->m_prev == node;
}
return true;
}
} // namespace thread
} // namespace llarp
#endif

View File

@ -35,10 +35,13 @@ list(APPEND TEST_SRC
util/test_llarp_util_bits.cpp
util/test_llarp_util_encode.cpp
util/test_llarp_util_ini.cpp
util/test_llarp_util_object.cpp
util/test_llarp_util_printer.cpp
util/test_llarp_util_queue_manager.cpp
util/test_llarp_util_queue.cpp
util/test_llarp_utils_scheduler.cpp
util/test_llarp_util_thread_pool.cpp
util/test_llarp_util_timerqueue.cpp
util/test_llarp_util_traits.cpp
)

View File

@ -0,0 +1,161 @@
#include <util/object.hpp>
#include <array>
#include <thread>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp::object;
TEST(Object, VerifySize)
{
static_assert(sizeof(Buffer< char >) == sizeof(char), "");
static_assert(sizeof(Buffer< int >) == sizeof(int), "");
static_assert(sizeof(Buffer< double >) == sizeof(double), "");
static_assert(sizeof(Buffer< std::string >) == sizeof(std::string), "");
}
TEST(Object, Inplace)
{
// Verify we can create and destroy a type with a non-trivial destructor
Buffer< std::vector< std::string > > strBuf;
new(strBuf.buffer()) std::vector< std::string >(100, "abc");
strBuf.value().~vector();
}
TEST(Catalog, smoke)
{
const double value1 = 1.0;
const double value2 = 2.0;
int handle1 = -1;
int handle2 = -1;
Catalog< double > catalog;
handle1 = catalog.add(value1);
catalog.remove(handle1);
for(size_t j = 0; j < 5; ++j)
{
for(size_t i = 1; i < 256; ++i)
{
ASSERT_FALSE(catalog.find(handle1));
handle2 = catalog.add(value2);
catalog.remove(handle2);
}
handle2 = catalog.add(value2);
ASSERT_EQ(handle1, handle2);
ASSERT_TRUE(catalog.find(handle1));
absl::optional< double > result = catalog.find(handle1);
ASSERT_TRUE(result);
ASSERT_EQ(value2, result);
catalog.remove(handle2);
}
}
TEST(Catalog, Iterator)
{
static constexpr size_t THREAD_COUNT = 10;
static constexpr size_t ITERATION_COUNT = 1000;
std::array< std::thread, THREAD_COUNT + 3 > threads;
using llarp::util::Barrier;
using Iterator = CatalogIterator< int >;
using Cat = Catalog< int >;
Barrier barrier(THREAD_COUNT + 3);
Cat catalog;
// Repeatedly remove + add values from the catalog
for(size_t i = 0; i < THREAD_COUNT; ++i)
{
threads[i] = std::thread(
[](Barrier *barrier, Cat *catalog, int id) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
int h = catalog->add(id);
absl::optional< int > res = catalog->find(h);
ASSERT_TRUE(res);
ASSERT_EQ(res.value(), id);
ASSERT_TRUE(catalog->replace(-id - 1, h));
res = catalog->find(h);
ASSERT_TRUE(res);
ASSERT_EQ(-id - 1, res.value());
int removed = -1;
ASSERT_TRUE(catalog->remove(h, &removed));
ASSERT_EQ(removed, -id - 1);
ASSERT_FALSE(catalog->find(h));
}
},
&barrier, &catalog, i);
}
// Verify the length constraint is never violated
threads[THREAD_COUNT] = std::thread(
[](Barrier *barrier, Cat *catalog) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
size_t size = catalog->size();
ASSERT_LE(size, THREAD_COUNT);
}
},
&barrier, &catalog);
// Verify that iteration always produces a valid state
threads[THREAD_COUNT + 1] = std::thread(
[](Barrier *barrier, Cat *catalog) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
int arr[100];
size_t size = 0;
for(Iterator it(*catalog); it; ++it)
{
arr[size++] = it().second;
}
for(int i = 0; i < 100; i++)
{
// value must be valid
bool present = false;
for(int id = 0; id < static_cast< int >(THREAD_COUNT); id++)
{
if(id == arr[i] || -id - 1 == arr[i])
{
present = true;
break;
}
}
ASSERT_TRUE(present);
// no duplicate should be there
for(size_t j = i + 1; j < size; j++)
{
ASSERT_NE(arr[i], arr[j]);
}
}
}
},
&barrier, &catalog);
// And that we don't have an invalid catalog
threads[THREAD_COUNT + 2] = std::thread(
[](Barrier *barrier, Cat *catalog) {
barrier->Block();
for(size_t i = 0; i < ITERATION_COUNT; ++i)
{
catalog->verify();
}
},
&barrier, &catalog);
for(std::thread &t : threads)
{
t.join();
}
}

View File

@ -0,0 +1,338 @@
#include <util/timerqueue.hpp>
#include <thread>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using CharQueue = llarp::thread::TimerQueue< const char* >;
using CharItem = llarp::thread::TimerQueueItem< const char* >;
TEST(TimerQueue, smoke)
{
CharQueue queue;
const absl::Time TA = absl::Time();
const absl::Time TB = TA + absl::Seconds(1);
const absl::Time TC = TB + absl::Seconds(1);
const absl::Time TD = TC + absl::Seconds(1);
const absl::Time TE = TD + absl::Seconds(1);
const char* VA = "hello";
const char* VB = "world,";
const char* VC = "how";
const char* VD = "are";
const char* VE = "you";
int HA = queue.add(TA, VA);
int HB = queue.add(TB, VB);
int HC = queue.add(TC, VC);
int HD = queue.add(TD, VD);
int HE = queue.add(TE, VE);
CharItem tItem;
absl::Time newMinTime;
size_t newSize;
ASSERT_TRUE(queue.popFront(&tItem));
ASSERT_EQ(VA, tItem.value());
ASSERT_EQ(TA, tItem.time());
ASSERT_EQ(HA, tItem.handle());
ASSERT_TRUE(queue.popFront(&tItem, &newSize, &newMinTime));
ASSERT_EQ(3, newSize);
ASSERT_EQ(TC, newMinTime);
ASSERT_EQ(TB, tItem.time());
ASSERT_EQ(VB, tItem.value());
ASSERT_EQ(HB, tItem.handle());
std::vector< CharItem > a1;
queue.popLess(TD, &a1, &newSize, &newMinTime);
ASSERT_EQ(2, a1.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
ASSERT_EQ(TC, a1[0].time());
ASSERT_EQ(VC, a1[0].value());
ASSERT_EQ(HC, a1[0].handle());
ASSERT_EQ(TD, a1[1].time());
ASSERT_EQ(VD, a1[1].value());
ASSERT_EQ(HD, a1[1].handle());
std::vector< CharItem > a2;
queue.popLess(TD, &a2, &newSize, &newMinTime);
ASSERT_EQ(0, a2.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
std::vector< CharItem > a3;
queue.popLess(TE, &a3, &newSize, &newMinTime);
ASSERT_EQ(1, a3.size());
ASSERT_EQ(0, newSize);
ASSERT_EQ(TE, a3[0].time());
ASSERT_EQ(VE, a3[0].value());
ASSERT_EQ(HE, a3[0].handle());
}
TEST(TimerQueue, KeySmoke)
{
CharQueue x1;
const absl::Time TA = absl::Time();
const absl::Time TB = TA + absl::Seconds(1);
const absl::Time TC = TB + absl::Seconds(1);
const absl::Time TD = TC + absl::Seconds(1);
const absl::Time TE = TD + absl::Seconds(1);
const char* VA = "hello";
const char* VB = "world,";
const char* VC = "how";
const char* VD = "are";
const char* VE = "you";
typedef CharQueue::Key Key;
const Key KA = Key(&TA);
const Key KB = Key(&TB);
const Key KC = Key(382);
const Key KD = Key(123);
const Key KE = Key(&VE);
int HA = x1.add(TA, VA, KA);
int HB = x1.add(TB, VB, KB);
int HC = x1.add(TC, VC, KC);
int HD = x1.add(TD, VD, KD);
int HE = x1.add(TE, VE, KE);
ASSERT_FALSE(x1.remove(HA, KB));
ASSERT_TRUE(x1.isValid(HA, KA));
ASSERT_FALSE(x1.update(HC, KD, TE));
CharItem tItem;
absl::Time newMinTime;
size_t newSize;
ASSERT_TRUE(x1.popFront(&tItem));
ASSERT_EQ(VA, tItem.value());
ASSERT_EQ(TA, tItem.time());
ASSERT_EQ(HA, tItem.handle());
ASSERT_EQ(KA, tItem.key());
ASSERT_TRUE(x1.popFront(&tItem, &newSize, &newMinTime));
ASSERT_EQ(3, newSize);
ASSERT_EQ(TC, newMinTime);
ASSERT_EQ(TB, tItem.time());
ASSERT_EQ(VB, tItem.value());
ASSERT_EQ(HB, tItem.handle());
ASSERT_EQ(KB, tItem.key());
std::vector< CharItem > a1;
x1.popLess(TD, &a1, &newSize, &newMinTime);
ASSERT_EQ(2, a1.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
ASSERT_EQ(TC, a1[0].time());
ASSERT_EQ(VC, a1[0].value());
ASSERT_EQ(HC, a1[0].handle());
ASSERT_EQ(KC, a1[0].key());
ASSERT_EQ(TD, a1[1].time());
ASSERT_EQ(VD, a1[1].value());
ASSERT_EQ(HD, a1[1].handle());
ASSERT_EQ(KD, a1[1].key());
std::vector< CharItem > a2;
x1.popLess(TD, &a2, &newSize, &newMinTime);
ASSERT_EQ(0, a2.size());
ASSERT_EQ(1, newSize);
ASSERT_EQ(TE, newMinTime);
std::vector< CharItem > a3;
x1.popLess(TE, &a3, &newSize, &newMinTime);
ASSERT_EQ(1, a3.size());
ASSERT_EQ(0, newSize);
ASSERT_EQ(TE, a3[0].time());
ASSERT_EQ(VE, a3[0].value());
ASSERT_EQ(HE, a3[0].handle());
ASSERT_EQ(KE, a3[0].key());
}
TEST(TimerQueue, Update)
{
const char VA[] = "A";
const char VB[] = "B";
const char VC[] = "C";
const char VD[] = "D";
const char VE[] = "E";
// clang-format off
static const struct
{
int m_secs;
int m_nsecs;
const char* m_value;
int m_updsecs;
int m_updnsecs;
bool m_isNewTop;
} VALUES[] = {
{2, 1000000, VA, 0, 1000000, false},
{2, 1000000, VB, 3, 1000000, false},
{2, 1000000, VC, 0, 4000, false},
{2, 1000001, VB, 0, 3999, true},
{1, 9999998, VC, 4, 9999998, false},
{1, 9999999, VD, 0, 0, true},
{0, 4000, VE, 10, 4000, false}};
// clang-format on
static const int POP_ORDER[] = {5, 3, 2, 0, 1, 4, 6};
const int NUM_VALUES = sizeof VALUES / sizeof *VALUES;
int handles[NUM_VALUES];
CharQueue queue;
{
CharItem item;
ASSERT_FALSE(queue.popFront(&item));
}
for(int i = 0; i < NUM_VALUES; ++i)
{
const char* VAL = VALUES[i].m_value;
const int SECS = VALUES[i].m_secs;
const int NSECS = VALUES[i].m_nsecs;
absl::Time TIME =
absl::Time() + absl::Seconds(SECS) + absl::Nanoseconds(NSECS);
handles[i] = queue.add(TIME, VAL);
ASSERT_EQ(i + 1, queue.size());
ASSERT_TRUE(queue.isValid(handles[i]));
}
for(int i = 0; i < NUM_VALUES; ++i)
{
const int UPDSECS = VALUES[i].m_updsecs;
const bool EXPNEWTOP = VALUES[i].m_isNewTop;
const int UPDNSECS = VALUES[i].m_updnsecs;
absl::Time UPDTIME =
absl::Time() + absl::Seconds(UPDSECS) + absl::Nanoseconds(UPDNSECS);
bool isNewTop;
CharItem item;
ASSERT_TRUE(queue.isValid(handles[i])) << i;
ASSERT_TRUE(queue.update(handles[i], UPDTIME, &isNewTop)) << i;
EXPECT_EQ(EXPNEWTOP, isNewTop) << i;
ASSERT_TRUE(queue.isValid(handles[i])) << i;
}
for(int i = 0; i < NUM_VALUES; ++i)
{
const int I = POP_ORDER[i];
const char* EXPVAL = VALUES[I].m_value;
const int EXPSECS = VALUES[I].m_updsecs;
const int EXPNSECS = VALUES[I].m_updnsecs;
absl::Time EXPTIME =
absl::Time() + absl::Seconds(EXPSECS) + absl::Nanoseconds(EXPNSECS);
CharItem item;
ASSERT_TRUE(queue.isValid(handles[I]));
ASSERT_TRUE(queue.popFront(&item));
ASSERT_EQ(EXPTIME, item.time());
ASSERT_EQ(EXPVAL, item.value());
ASSERT_FALSE(queue.isValid(handles[I]));
}
}
TEST(TimerQueue, ThreadSafety)
{
using Data = std::string;
using StringQueue = llarp::thread::TimerQueue< std::string >;
using StringItem = llarp::thread::TimerQueueItem< std::string >;
using Info = std::pair< int, std::vector< StringItem >* >;
static constexpr size_t NUM_THREADS = 10;
static constexpr size_t NUM_ITERATIONS = 1000;
static constexpr size_t NUM_REMOVE_ALL = NUM_ITERATIONS / 2;
Info info[NUM_THREADS];
std::thread threads[NUM_THREADS + 1];
std::vector< StringItem > items[NUM_THREADS];
absl::Barrier barrier(NUM_THREADS + 1);
StringQueue queue;
for(size_t i = 0; i < NUM_THREADS; ++i)
{
info[i].first = i;
info[i].second = &items[i];
threads[i] = std::thread(
[](Info* info, absl::Barrier* barrier, StringQueue* queue) {
const int THREAD_ID = info->first;
std::vector< StringItem >* vPtr = info->second;
// We stagger the removeAll steps among the threads.
const int STEP_REMOVE_ALL = THREAD_ID * NUM_REMOVE_ALL / NUM_THREADS;
std::ostringstream oss;
oss << THREAD_ID;
Data V(oss.str());
barrier->Block();
size_t newSize;
absl::Time newMinTime;
StringItem item;
for(size_t i = 0; i < NUM_ITERATIONS; ++i)
{
const absl::Time TIME =
absl::Time() + absl::Seconds((i * (i + 3)) % NUM_ITERATIONS);
int h = queue->add(TIME, V);
queue->update(h, TIME);
if(queue->popFront(&item, &newSize, &newMinTime))
{
vPtr->push_back(item);
}
h = queue->add(newMinTime, V);
queue->popLess(newMinTime, vPtr);
if(queue->remove(h, &item, &newSize, &newMinTime))
{
vPtr->push_back(item);
}
if(i % NUM_REMOVE_ALL == STEP_REMOVE_ALL)
{
queue->removeAll(vPtr);
}
}
},
&info[i], &barrier, &queue);
}
threads[NUM_THREADS] = std::thread(
[](absl::Barrier* barrier, StringQueue* queue) {
barrier->Block();
for(size_t i = 0; i < NUM_ITERATIONS; ++i)
{
size_t size = queue->size();
ASSERT_GE(size, 0);
ASSERT_LE(size, NUM_THREADS);
}
},
&barrier, &queue);
size_t size = 0;
for(size_t i = 0; i < NUM_THREADS; ++i)
{
threads[i].join();
size += static_cast< int >(items[i].size());
}
threads[NUM_THREADS].join();
ASSERT_EQ(0, queue.size());
ASSERT_EQ(1000 * NUM_THREADS * 2, size);
}

View File

@ -0,0 +1,194 @@
#include <util/scheduler.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
using namespace llarp;
using thread::Scheduler;
using thread::Tardis;
TEST(SchedulerTest, smoke)
{
Scheduler scheduler;
ASSERT_TRUE(scheduler.start());
scheduler.stop();
}
struct TestCallback
{
std::atomic_size_t m_startCount;
std::atomic_size_t m_execCount;
absl::Duration executeTime;
TestCallback() : m_startCount(0), m_execCount(0), executeTime()
{
}
void
callback()
{
m_startCount++;
if(executeTime != absl::Duration())
{
std::this_thread::sleep_for(absl::ToChronoSeconds(executeTime));
}
m_execCount++;
}
void
waitFor(absl::Duration duration, size_t attemptCount,
size_t executeCount) const
{
for(size_t i = 0; i < attemptCount; ++i)
{
if(executeCount + 1 <= m_execCount)
{
return;
}
std::this_thread::sleep_until(absl::ToChronoTime(absl::Now() + duration));
std::this_thread::yield();
}
}
};
TEST(SchedulerTest, fakeTime)
{
// Just test we can mock out Time itself
Scheduler scheduler;
Tardis time{scheduler};
absl::Time now = time.now();
TestCallback callback1, callback2;
Scheduler::Handle handle = scheduler.schedule(
now + absl::Seconds(30), std::bind(&TestCallback::callback, &callback1));
ASSERT_NE(Scheduler::INVALID_HANDLE, handle);
handle = scheduler.scheduleRepeat(
absl::Seconds(60), std::bind(&TestCallback::callback, &callback2));
ASSERT_NE(Scheduler::INVALID_HANDLE, handle);
scheduler.start();
time.advanceTime(absl::Seconds(35));
ASSERT_EQ(time.now(), now + absl::Seconds(35));
callback1.waitFor(absl::Milliseconds(10), 100, 0);
ASSERT_EQ(1u, callback1.m_execCount);
ASSERT_EQ(0u, callback2.m_execCount);
// jump forward another 30 seconds, the repeat event should kick off
time.advanceTime(absl::Seconds(30));
ASSERT_EQ(time.now(), now + absl::Seconds(65));
callback2.waitFor(absl::Milliseconds(10), 100, 0);
ASSERT_EQ(1u, callback1.m_execCount);
ASSERT_EQ(1u, callback2.m_execCount);
// jump forward another minute, the repeat event should have run again
time.advanceTime(absl::Seconds(60));
callback2.waitFor(absl::Milliseconds(10), 100, 1);
ASSERT_EQ(1u, callback1.m_execCount);
ASSERT_EQ(2u, callback2.m_execCount);
scheduler.stop();
}
TEST(SchedulerTest, func1)
{
Scheduler scheduler;
scheduler.start();
TestCallback callback1, callback2;
absl::Time now = absl::Now();
scheduler.scheduleRepeat(absl::Milliseconds(30),
std::bind(&TestCallback::callback, &callback1));
scheduler.schedule(now + absl::Milliseconds(60),
std::bind(&TestCallback::callback, &callback2));
std::this_thread::yield();
std::this_thread::sleep_for(absl::ToChronoSeconds(absl::Milliseconds(40)));
callback1.waitFor(absl::Milliseconds(10), 100, 0);
scheduler.stop();
absl::Duration elapsed = absl::Now() - now;
size_t count1 = callback1.m_execCount;
size_t count2 = callback2.m_execCount;
if(elapsed < absl::Milliseconds(60))
{
ASSERT_EQ(1u, count1);
ASSERT_EQ(0u, count2);
}
else
{
ASSERT_LE(1u, count1);
}
callback1.waitFor(absl::Milliseconds(10), 100, 0);
size_t count = callback1.m_execCount;
ASSERT_EQ(count1, count);
count = callback2.m_execCount;
ASSERT_EQ(count2, count);
if(count2 == 0)
{
// callback2 not executed
scheduler.start();
std::this_thread::yield();
std::this_thread::sleep_for(absl::ToChronoSeconds(absl::Milliseconds(40)));
callback2.waitFor(absl::Milliseconds(10), 100, count2);
count = callback2.m_execCount;
ASSERT_LE(count2 + 1, count);
}
else
{
ASSERT_LT(absl::Milliseconds(60), elapsed);
}
}
TEST(SchedulerTest, cancelAllRepeats)
{
Scheduler scheduler;
scheduler.start();
TestCallback callback1, callback2;
const Scheduler::Handle handle1 = scheduler.scheduleRepeat(
absl::Milliseconds(30), std::bind(&TestCallback::callback, &callback1));
const Scheduler::Handle handle2 = scheduler.scheduleRepeat(
absl::Milliseconds(30), std::bind(&TestCallback::callback, &callback2));
scheduler.cancelAllRepeats();
ASSERT_FALSE(scheduler.cancelRepeat(handle1));
ASSERT_FALSE(scheduler.cancelRepeat(handle2));
const size_t count1 = callback1.m_execCount;
const size_t count2 = callback2.m_execCount;
std::this_thread::yield();
std::this_thread::sleep_for(absl::ToChronoSeconds(absl::Milliseconds(100)));
size_t count = callback1.m_execCount;
ASSERT_EQ(count1, count);
count = callback2.m_execCount;
ASSERT_EQ(count2, count);
scheduler.stop();
}