diff --git a/.vscode/launch.json b/.vscode/launch.json index 8380cc172..682114bca 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,8 +7,8 @@ "name": "(lldb) Launch", "type": "cppdbg", "request": "launch", - "program": "${workspaceFolder}/lokinet", - "args": [], + "program": "${workspaceFolder}/build/testAll", + "args": ["--gtest_shuffle", "--gtest_filter=-AbyssTest.TestClientAndServer"], "stopAtEntry": false, "cwd": "${workspaceFolder}", "environment": [], diff --git a/CMakeLists.txt b/CMakeLists.txt index c7ec5088a..cd3f98e9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -266,6 +266,7 @@ set(LIB_PLATFORM_SRC # for logic llarp/timer.cpp # for threading + llarp/queue_manager.cpp llarp/threadpool.cpp # for android shim ${ANDROID_PLATFORM_SRC} @@ -512,6 +513,7 @@ set(TEST_SRC test/test_dns_unit.cpp test/test_dnsc_unit.cpp test/test_dnsd_unit.cpp + test/test_llarp_queue_manager.cpp ) diff --git a/llarp/queue_manager.cpp b/llarp/queue_manager.cpp new file mode 100644 index 000000000..1c3019c86 --- /dev/null +++ b/llarp/queue_manager.cpp @@ -0,0 +1,562 @@ +#include "queue_manager.hpp" + +#include + +namespace llarp +{ + namespace thread + { + // Turn an enum into its underlying value. + template < typename E > + constexpr auto + to_underlying(E e) noexcept + { + return static_cast< std::underlying_type_t< E > >(e); + } + + static constexpr uint32_t GENERATION_COUNT_SHIFT = 0x2; + + // Max number of generations which can be held in an uint32_t. + static constexpr size_t NUM_ELEMENT_GENERATIONS = 1 + << ((sizeof(uint32_t) * 8) - 2); + + // mask for holding the element state from an element + static constexpr uint32_t ELEMENT_STATE_MASK = 0x3; + + // mask for holding the disabled bit in the index. + static constexpr uint32_t DISABLED_STATE_MASK = 1 + << ((sizeof(uint32_t) * 8) - 1); + + // Max number of combinations of index and generations. + static constexpr uint32_t NUM_COMBINED_INDEXES = DISABLED_STATE_MASK; + + bool + isDisabledFlagSet(uint32_t encodedIndex) + { + return (encodedIndex & DISABLED_STATE_MASK); + } + + uint32_t + discardDisabledFlag(uint32_t encodedIndex) + { + return (encodedIndex & ~DISABLED_STATE_MASK); + } + + uint32_t + encodeElement(uint32_t generation, ElementState state) + { + return (generation << GENERATION_COUNT_SHIFT) | to_underlying(state); + } + + uint32_t + decodeGenerationFromElementState(uint32_t state) + { + return state >> GENERATION_COUNT_SHIFT; + } + + ElementState + decodeStateFromElementState(uint32_t state) + { + return ElementState(state & ELEMENT_STATE_MASK); + } + + QueueManager::AtomicIndex& + QueueManager::pushIndex() + { + return m_pushIndex; + } + + QueueManager::AtomicIndex& + QueueManager::popIndex() + { + return m_popIndex; + } + + const QueueManager::AtomicIndex& + QueueManager::pushIndex() const + { + return m_pushIndex; + } + + const QueueManager::AtomicIndex& + QueueManager::popIndex() const + { + return m_popIndex; + } + + uint32_t + QueueManager::nextCombinedIndex(uint32_t index) const + { + if(m_maxCombinedIndex == index) + { + return 0; + } + + return index + 1; + } + + uint32_t + QueueManager::nextGeneration(uint32_t generation) const + { + if(m_maxGeneration == generation) + { + return 0; + } + + return generation + 1; + } + + size_t + QueueManager::capacity() const + { + return m_capacity; + } + + int32_t + QueueManager::circularDifference(uint32_t startingValue, + uint32_t subtractValue, uint32_t modulo) + { + assert(modulo + <= (static_cast< uint32_t >(std::numeric_limits< int32_t >::max()) + + 1)); + assert(startingValue < modulo); + assert(subtractValue < modulo); + + int32_t difference = startingValue - subtractValue; + if(difference > static_cast< int32_t >(modulo / 2)) + { + return difference - modulo; + } + else if(difference < -static_cast< int32_t >(modulo / 2)) + { + return difference + modulo; + } + else + { + return difference; + } + } + + uint32_t + QueueManager::numGenerations(size_t capacity) + { + assert(capacity != 0); + + return static_cast< uint32_t >( + std::min(NUM_COMBINED_INDEXES / capacity, NUM_ELEMENT_GENERATIONS)); + } + + QueueManager::QueueManager(size_t capacity) + : m_pushIndex(0) + , m_popIndex(0) + , m_capacity(capacity) + , m_maxGeneration(numGenerations(capacity) - 1) + , m_maxCombinedIndex( + numGenerations(capacity) * static_cast< uint32_t >(capacity) - 1) + { + assert(0 < capacity); + assert(capacity <= MAX_CAPACITY); + (void)m_pushPadding; + (void)m_popPadding; + + m_states = new std::atomic_uint32_t[capacity]; + + for(size_t i = 0; i < capacity; ++i) + { + m_states[i] = 0; + } + } + + QueueManager::~QueueManager() + { + delete m_states; + } + + QueueReturn + QueueManager::reservePushIndex(uint32_t& generation, uint32_t& index) + { + uint32_t loadedPushIndex = pushIndex().load(std::memory_order_relaxed); + + uint32_t savedPushIndex = -1; + + uint32_t combinedIndex = 0; + uint32_t currIdx = 0; + uint32_t currGen = 0; + + // Use savedPushIndex to make us acquire an index at least twice before + // returning QueueFull. + // This prevents us from massive contention when we have a queue of size 1 + + for(;;) + { + if(isDisabledFlagSet(loadedPushIndex)) + { + return QueueReturn::QueueDisabled; + } + + combinedIndex = discardDisabledFlag(loadedPushIndex); + + currGen = static_cast< uint32_t >(combinedIndex / m_capacity); + currIdx = static_cast< uint32_t >(combinedIndex % m_capacity); + + uint32_t compare = encodeElement(currGen, ElementState::Empty); + const uint32_t swap = encodeElement(currGen, ElementState::Writing); + + if(m_states[currIdx].compare_exchange_strong(compare, swap)) + { + // We changed the state. + generation = currGen; + index = currIdx; + break; + } + + // We failed to reserve the index. Use the result from cmp n swap to + // determine if the queue was full or not. Either: + // 1. The cell is from a previous generation (so the queue is full) + // 2. Another cell has reserved this cell for writing, but not commited + // yet + // 3. The push index has been changed between the load and the cmp. + + uint32_t elemGen = decodeGenerationFromElementState(compare); + + int32_t difference = static_cast< int32_t >(currGen - elemGen); + + if(difference == 1 + || (difference == -static_cast< int32_t >(m_maxGeneration))) + { + // Queue is full. + + assert(1 + == circularDifference(currGen, elemGen, m_maxGeneration + 1)); + + ElementState state = decodeStateFromElementState(compare); + + if(state == ElementState::Reading) + { + // Another thread is reading. Yield this thread + std::this_thread::yield(); + loadedPushIndex = pushIndex().load(std::memory_order_relaxed); + continue; + } + + assert(state != ElementState::Empty); + + if(savedPushIndex != loadedPushIndex) + { + // Make another attempt to check the queue is full before failing + std::this_thread::yield(); + savedPushIndex = loadedPushIndex; + loadedPushIndex = pushIndex().load(std::memory_order_relaxed); + continue; + } + + return QueueReturn::QueueFull; + } + + // Another thread has already acquired this cell, try to increment the + // push index and go again. + + assert(0 >= circularDifference(currGen, elemGen, m_maxGeneration + 1)); + + const uint32_t next = nextCombinedIndex(combinedIndex); + pushIndex().compare_exchange_strong(combinedIndex, next); + loadedPushIndex = combinedIndex; + } + + // We got the cell, increment the push index + const uint32_t next = nextCombinedIndex(combinedIndex); + pushIndex().compare_exchange_strong(combinedIndex, next); + + return QueueReturn::Success; + } + + void + QueueManager::commitPushIndex(uint32_t generation, uint32_t index) + { + assert(generation <= m_maxGeneration); + assert(index < m_capacity); + assert(ElementState::Writing + == decodeStateFromElementState(m_states[index])); + assert(generation == decodeGenerationFromElementState(m_states[index])); + + m_states[index] = encodeElement(generation, ElementState::Full); + } + + QueueReturn + QueueManager::reservePopIndex(uint32_t& generation, uint32_t& index) + { + uint32_t loadedPopIndex = popIndex().load(); + uint32_t savedPopIndex = -1; + + uint32_t currIdx = 0; + uint32_t currGen = 0; + + for(;;) + { + currGen = static_cast< uint32_t >(loadedPopIndex / m_capacity); + currIdx = static_cast< uint32_t >(loadedPopIndex % m_capacity); + + // Try to swap this state from full to reading. + + uint32_t compare = encodeElement(currGen, ElementState::Full); + const uint32_t swap = encodeElement(currGen, ElementState::Reading); + + if(m_states[currIdx].compare_exchange_strong(compare, swap)) + { + generation = currGen; + index = currIdx; + break; + } + + // We failed to reserve the index. Use the result from cmp n swap to + // determine if the queue was full or not. Either: + // 1. The cell is from a previous generation (so the queue is empty) + // 2. The cell is from the current generation and empty (so the queue is + // empty) + // 3. The queue is being written to + // 4. The pop index has been changed between the load and the cmp. + + uint32_t elemGen = decodeGenerationFromElementState(compare); + ElementState state = decodeStateFromElementState(compare); + + int32_t difference = static_cast< int32_t >(currGen - elemGen); + + if(difference == 1 + || (difference == -static_cast< int32_t >(m_maxGeneration))) + { + // Queue is full. + assert(state == ElementState::Reading); + assert( + 1 == (circularDifference(currGen, elemGen, m_maxGeneration) + 1)); + + return QueueReturn::QueueEmpty; + } + + if(difference == 0 && state == ElementState::Empty) + { + // The cell is empty in the current generation, so the queue is empty + + if(savedPopIndex != loadedPopIndex) + { + std::this_thread::yield(); + savedPopIndex = loadedPopIndex; + loadedPopIndex = popIndex().load(std::memory_order_relaxed); + continue; + } + + return QueueReturn::QueueEmpty; + } + + if(difference != 0 || state == ElementState::Writing) + { + // The cell is currently being written to or the index is outdated) + // Yield and try again. + std::this_thread::yield(); + loadedPopIndex = popIndex().load(std::memory_order_relaxed); + continue; + } + + popIndex().compare_exchange_strong(loadedPopIndex, + nextCombinedIndex(loadedPopIndex)); + } + + popIndex().compare_exchange_strong(loadedPopIndex, + nextCombinedIndex(loadedPopIndex)); + + return QueueReturn::Success; + } + + void + QueueManager::commitPopIndex(uint32_t generation, uint32_t index) + { + assert(generation <= m_maxGeneration); + assert(index < m_capacity); + assert(decodeStateFromElementState(m_states[index]) + == ElementState::Reading); + assert(generation == decodeGenerationFromElementState(m_states[index])); + + m_states[index] = + encodeElement(nextGeneration(generation), ElementState::Empty); + } + + void + QueueManager::disable() + { + // Loop until we set the disabled bit + for(;;) + { + uint32_t index = pushIndex(); + + if(isDisabledFlagSet(index)) + { + // Queue is already disabled(?!) + return; + } + + if(pushIndex().compare_exchange_strong(index, + index | DISABLED_STATE_MASK)) + { + // queue has been disabled + return; + } + } + } + + void + QueueManager::enable() + { + for(;;) + { + uint32_t index = pushIndex(); + + if(!isDisabledFlagSet(index)) + { + // queue is already enabled. + return; + } + + if(pushIndex().compare_exchange_strong(index, + index & ~DISABLED_STATE_MASK)) + { + // queue has been enabled + return; + } + } + } + + bool + QueueManager::reservePopForClear(uint32_t& generation, uint32_t& index, + uint32_t endGeneration, uint32_t endIndex) + { + assert(endGeneration <= m_maxGeneration); + assert(endIndex < m_capacity); + + uint32_t loadedCombinedIndex = popIndex().load(std::memory_order_relaxed); + + for(;;) + { + u_int32_t endCombinedIndex = + (endGeneration * static_cast< uint32_t >(m_capacity)) + endIndex; + + if(circularDifference(endCombinedIndex, loadedCombinedIndex, + m_maxCombinedIndex + 1) + == 0) + { + return false; + } + + assert(0 < circularDifference(endCombinedIndex, loadedCombinedIndex, + m_maxCombinedIndex + 1)); + + u_int32_t currIdx = + static_cast< uint32_t >(loadedCombinedIndex % m_capacity); + u_int32_t currGen = + static_cast< uint32_t >(loadedCombinedIndex / m_capacity); + + // Try to swap this cell from Full to Reading. + // We only set this to Empty after trying to increment popIndex, so we + // don't race against another thread. + + uint32_t compare = encodeElement(currGen, ElementState::Full); + const uint32_t swap = encodeElement(currGen, ElementState::Reading); + + if(m_states[currIdx].compare_exchange_strong(compare, swap)) + { + // We've dropped this index. + + generation = currGen; + index = currIdx; + break; + } + + ElementState state = decodeStateFromElementState(compare); + + if(state == ElementState::Writing || state == ElementState::Full) + { + // Another thread is writing to this cell, or this thread has slept + // for too long. + std::this_thread::yield(); + loadedCombinedIndex = popIndex().load(std::memory_order_relaxed); + continue; + } + + const uint32_t next = nextCombinedIndex(loadedCombinedIndex); + popIndex().compare_exchange_strong(loadedCombinedIndex, next); + } + + // Attempt to increment the index. + const uint32_t next = nextCombinedIndex(loadedCombinedIndex); + popIndex().compare_exchange_strong(loadedCombinedIndex, next); + + return true; + } + + void + QueueManager::abortPushIndexReservation(uint32_t generation, uint32_t index) + { + assert(generation <= m_maxGeneration); + assert(index < m_capacity); + assert(static_cast< uint32_t >((generation * m_capacity) + index) + == popIndex().load(std::memory_order_relaxed)); + assert(decodeStateFromElementState(m_states[index]) + == ElementState::Writing); + assert(generation == decodeGenerationFromElementState(m_states[index])); + + uint32_t loadedPopIndex = popIndex().load(std::memory_order_relaxed); + + assert(generation == loadedPopIndex / m_capacity); + assert(index == loadedPopIndex % m_capacity); + + m_states[index] = encodeElement(generation, ElementState::Reading); + + const uint32_t nextIndex = nextCombinedIndex(loadedPopIndex); + popIndex().compare_exchange_strong(loadedPopIndex, nextIndex); + + m_states[index] = + encodeElement(nextGeneration(generation), ElementState::Empty); + } + + size_t + QueueManager::size() const + { + // Note that we rely on these loads being sequentially consistent. + + uint32_t combinedPushIndex = discardDisabledFlag(pushIndex()); + uint32_t combinedPopIndex = popIndex(); + + int32_t difference = combinedPushIndex - combinedPopIndex; + + if(difference >= 0) + { + if(difference > static_cast< int32_t >(m_capacity)) + { + // We've raced between getting push and pop indexes, in this case, it + // means the queue is empty. + assert(0 > circularDifference(combinedPushIndex, combinedPopIndex, + m_maxCombinedIndex + 1)); + + return 0; + } + + return static_cast< size_t >(difference); + } + + if(difference < -static_cast< int32_t >(m_maxCombinedIndex / 2)) + { + assert(0 < circularDifference(combinedPushIndex, combinedPopIndex, + m_maxCombinedIndex + 1)); + + difference += m_maxCombinedIndex + 1; + return std::min(static_cast< size_t >(difference), m_capacity); + } + + return 0; + } + + bool + QueueManager::enabled() const + { + return !isDisabledFlagSet(pushIndex().load()); + } + } // namespace thread +} // namespace llarp diff --git a/llarp/queue_manager.hpp b/llarp/queue_manager.hpp new file mode 100644 index 000000000..dc6be93d6 --- /dev/null +++ b/llarp/queue_manager.hpp @@ -0,0 +1,212 @@ +#ifndef LLARP_QUEUE_MANAGER_HPP +#define LLARP_QUEUE_MANAGER_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace llarp +{ + namespace thread + { + enum class ElementState : uint32_t + { + Empty = 0, + Writing = 1, + Full = 2, + Reading = 3 + }; + + enum class QueueReturn + { + Success, + QueueDisabled, + QueueEmpty, + QueueFull + }; + + inline std::ostream& + operator<<(std::ostream& os, QueueReturn val) + { + switch(val) + { + case QueueReturn::Success: + os << "Success"; + break; + case QueueReturn::QueueDisabled: + os << "QueueDisabled"; + break; + case QueueReturn::QueueEmpty: + os << "QueueEmpty"; + break; + case QueueReturn::QueueFull: + os << "QueueFull"; + break; + } + + return os; + } + + class QueueManager + { + // This class provides thread-safe state management for a queue. + + // Common terminology in this class: + // - "Combined Index": the combination of an index into the circular + // buffer and the generation count. Precisely: + // + // Combined Index = (Generation * Capacity) + Element Index + // + // The combined index has the useful property where incrementing the + // index when the element index is at the end of the buffer does two + // things: + // 1. Sets the element index back to 0 + // 2. Increments the generation + + public: + static constexpr size_t Alignment = 64; + + using AtomicIndex = std::atomic_uint32_t; + + private: + AtomicIndex m_pushIndex; // Index in the buffer that the next + // element will be added to. + + char m_pushPadding[Alignment - sizeof(AtomicIndex)]; + + AtomicIndex m_popIndex; // Index in the buffer that the next + // element will be removed from. + + char m_popPadding[Alignment - sizeof(AtomicIndex)]; + + const size_t m_capacity; // max size of the manager. + + const uint32_t m_maxGeneration; // Maximum generation for this object. + + const uint32_t m_maxCombinedIndex; // Maximum combined value of index and + // generation for this object. + + std::atomic_uint32_t* m_states; // Array of index states. + + AtomicIndex& + pushIndex(); + + AtomicIndex& + popIndex(); + + const AtomicIndex& + pushIndex() const; + + const AtomicIndex& + popIndex() const; + + // Return the next combined index + uint32_t + nextCombinedIndex(uint32_t index) const; + + // Return the next generation + uint32_t + nextGeneration(uint32_t generation) const; + + public: + // Return the difference between the startingValue and the subtractValue + // around a particular modulo. + static int32_t + circularDifference(uint32_t startingValue, uint32_t subtractValue, + uint32_t modulo); + + // Return the number of possible generations a circular buffer can hold. + static uint32_t + numGenerations(size_t capacity); + + // The max capacity of the queue manager. + // 2 bits are used for holding the disabled status and the number of + // generations is at least 2. + static constexpr size_t MAX_CAPACITY = 1 << ((sizeof(uint32_t) * 8) - 2); + + explicit QueueManager(size_t capacity); + + ~QueueManager(); + + // Push operations + + // Reserve the next available index to enqueue an element at. On success: + // - Load `index` with the next available index + // - Load `generation` with the current generation + // + // If this call succeeds, other threads may spin until `commitPushIndex` + // is called. + QueueReturn + reservePushIndex(uint32_t& generation, uint32_t& index); + + // Mark the `index` in the given `generation` as in-use. This unblocks + // any other threads which were waiting on the index state. + void + commitPushIndex(uint32_t generation, uint32_t index); + + // Pop operations + + // Reserve the next available index to remove an element from. On success: + // - Load `index` with the next available index + // - Load `generation` with the current generation + // + // If this call succeeds, other threads may spin until `commitPopIndex` + // is called. + QueueReturn + reservePopIndex(uint32_t& generation, uint32_t& index); + + // Mark the `index` in the given `generation` as available. This unblocks + // any other threads which were waiting on the index state. + void + commitPopIndex(uint32_t generation, uint32_t index); + + // Disable the queue + void + disable(); + + // Enable the queue + void + enable(); + + // Exception safety + + // If the next available index an element can be popped from is before + // the `endGeneration` and the `endIndex`, reserve that index into `index` + // and `generation`. + // + // Return true if an index was reserved and false otherwise. + // + // Behaviour is undefined if `endGeneration` and `endIndex` have not been + // acquired for writing. + // + // The intended usage of this method is to help remove all elements if an + // exception is thrown between reserving and committing an index. + // Workflow: + // 1. call reservePopForClear + // 2. call commitPopIndex, emptying all cells up to the reserved index + // 3. call abortPushIndexReservation on the index. + bool + reservePopForClear(uint32_t& generation, uint32_t& index, + uint32_t endGeneration, uint32_t endIndex); + + void + abortPushIndexReservation(uint32_t generation, uint32_t index); + + // Accessors + + bool + enabled() const; + + size_t + size() const; + + size_t + capacity() const; + }; + } // namespace thread +} // namespace llarp +#endif diff --git a/test/test_llarp_queue_manager.cpp b/test/test_llarp_queue_manager.cpp new file mode 100644 index 000000000..dc62d530f --- /dev/null +++ b/test/test_llarp_queue_manager.cpp @@ -0,0 +1,1125 @@ +#include + +#include +#include +#include + +using namespace llarp::thread; + +void +generation(QueueManager& manager, uint32_t pushIndex, uint32_t popIndex) +{ + ASSERT_GE(pushIndex, popIndex); + ASSERT_LE(pushIndex - popIndex, manager.capacity()); + + for(uint32_t i = 0; i < popIndex; ++i) + { + uint32_t gen = 0; + uint32_t index = 0; + + (void)manager.reservePushIndex(gen, index); + manager.commitPushIndex(gen, index); + + auto result = manager.reservePopIndex(gen, index); + + ASSERT_EQ(result, QueueReturn::Success); + ASSERT_EQ(index, i % manager.capacity()); + + manager.commitPopIndex(gen, index); + } + + for(uint32_t i = popIndex; i < pushIndex; ++i) + { + uint32_t gen = 0; + uint32_t index = 0; + + auto result = manager.reservePushIndex(gen, index); + ASSERT_EQ(result, QueueReturn::Success); + ASSERT_EQ(index, i % manager.capacity()); + + manager.commitPushIndex(gen, index); + } +} + +class IntQueue +{ + private: + QueueManager manager; + + std::vector< int > data; + + public: + IntQueue(const IntQueue&) = delete; + + explicit IntQueue(size_t capacity) : manager(capacity), data(capacity, 0) + { + } + + ~IntQueue() = default; + + bool + tryPushBack(int value) + { + uint32_t gen = 0; + uint32_t index = 0; + + if(manager.reservePushIndex(gen, index) == QueueReturn::Success) + { + data[index] = value; + manager.commitPushIndex(gen, index); + return true; + } + else + { + return false; + } + } + + std::optional< int > + tryPopFront() + { + uint32_t gen = 0; + uint32_t index = 0; + + if(manager.reservePopIndex(gen, index) == QueueReturn::Success) + { + int result = data[index]; + manager.commitPopIndex(gen, index); + return result; + } + else + { + return {}; + } + } + + size_t + size() const + { + return manager.size(); + } + + size_t + capacity() const + { + return manager.capacity(); + } +}; + +// This class exactly mirrors the data of the QueueManager, and is used for +// both debugging and whitebox testing. +struct QueueData +{ + public: + QueueManager::AtomicIndex m_pushIndex; // Index in the buffer that the next + // element will be added to. + + char m_pushPadding[QueueManager::Alignment + - sizeof(QueueManager::AtomicIndex)]; + + QueueManager::AtomicIndex m_popIndex; // Index in the buffer that the next + // element will be removed from. + + char + m_popPadding[QueueManager::Alignment - sizeof(QueueManager::AtomicIndex)]; + + const size_t m_capacity; // max size of the manager. + + const uint32_t m_maxGeneration; // Maximum generation for this object. + + const uint32_t m_maxCombinedIndex; // Maximum combined value of index and + // generation for this object. + + std::uint32_t* m_states; // Array of index states. +}; + +static_assert(sizeof(QueueData) == sizeof(QueueManager)); + +static constexpr uint32_t GENERATION_COUNT_SHIFT = 0x2; +static constexpr uint32_t ELEMENT_STATE_MASK = 0x3; + +struct QueueIntrospection +{ + private: + const QueueData* data; + + public: + QueueIntrospection(const QueueManager& manager) + : data(reinterpret_cast< const QueueData* >(&manager)) + { + } + + uint32_t + pushIndex() const + { + return data->m_pushIndex % capacity(); + } + + uint32_t + pushGeneration() const + { + return data->m_pushIndex / capacity(); + } + + uint32_t + popIndex() const + { + return data->m_popIndex % capacity(); + } + + uint32_t + popGeneration() const + { + return data->m_popIndex / capacity(); + } + + uint32_t + elementGen(uint32_t index) const + { + return data->m_states[index] >> GENERATION_COUNT_SHIFT; + } + + ElementState + elementState(uint32_t index) const + { + return static_cast< ElementState >(data->m_states[index] + & ELEMENT_STATE_MASK); + } + + uint32_t + maxGen() const + { + return data->m_maxGeneration; + } + + uint32_t + maxCombinedIndex() const + { + return data->m_maxCombinedIndex; + } + + uint32_t + capacity() const + { + return data->m_capacity; + } +}; + +void +adjustGeneration(QueueManager& manager, uint32_t gen) +{ + QueueData* data = reinterpret_cast< QueueData* >(&manager); + + auto capacity = manager.capacity(); + + for(size_t i = 0; i < capacity; ++i) + { + data->m_states[i] = gen << GENERATION_COUNT_SHIFT; + } + + *reinterpret_cast< QueueManager::AtomicIndex* >(&data->m_pushIndex) = + (gen * capacity); + *reinterpret_cast< QueueManager::AtomicIndex* >(&data->m_popIndex) = + (gen * capacity); +} + +void +dirtyGenerate(QueueManager& manager, uint32_t pushCombinedIndex, + uint32_t popCombinedIndex) +{ + ASSERT_GE(pushCombinedIndex, popCombinedIndex); + ASSERT_LE(pushCombinedIndex - popCombinedIndex, manager.capacity()); + + uint32_t capacity = manager.capacity(); + + uint32_t start = + static_cast< uint32_t >(popCombinedIndex / manager.capacity()); + + adjustGeneration(manager, start); + generation(manager, pushCombinedIndex - (start * capacity), + popCombinedIndex - (start * capacity)); +} + +TEST(TestQueueManager, SimpleUsage) +{ + IntQueue queue(2); + + bool rc = queue.tryPushBack(1); + ASSERT_TRUE(rc); + + rc = queue.tryPushBack(2); + ASSERT_TRUE(rc); + + rc = queue.tryPushBack(3); + ASSERT_FALSE(rc); + + ASSERT_EQ(2u, queue.size()); + + auto result = queue.tryPopFront(); + + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(1, result.value()); +} + +class BasicFunctionality : public ::testing::TestWithParam< uint32_t > +{ +}; + +TEST_P(BasicFunctionality, Push) +{ + uint32_t val = GetParam(); + + QueueManager manager(val); + + ASSERT_EQ(0u, manager.size()); + + uint32_t gen = 0; + uint32_t index = 0; + + for(uint32_t i = 0; i < val; ++i) + { + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(i, index); + ASSERT_EQ(0u, gen); + ASSERT_EQ(i, manager.size() - 1); + manager.commitPushIndex(gen, index); + } + + ASSERT_EQ(QueueReturn::QueueFull, manager.reservePushIndex(gen, index)); + ASSERT_EQ(val, manager.size()); +} + +TEST_P(BasicFunctionality, AcquiringPopIndex) +{ + uint32_t capacity = GetParam(); + + QueueManager manager(capacity); + + ASSERT_EQ(0u, manager.size()); + + uint32_t gen = 0; + uint32_t index = 0; + + for(uint32_t g = 0; g < 3; ++g) + { + for(uint32_t idx = 0; idx < capacity; ++idx) + { + ASSERT_EQ(QueueReturn::QueueEmpty, manager.reservePopIndex(gen, index)); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(g, gen); + ASSERT_EQ(index, idx); + ASSERT_EQ(1u, manager.size()); + + manager.commitPushIndex(gen, index); + ASSERT_EQ(1u, manager.size()); + + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + ASSERT_EQ(g, gen); + ASSERT_EQ(index, idx); + ASSERT_EQ(0u, manager.size()); + + manager.commitPopIndex(gen, index); + ASSERT_EQ(0u, manager.size()); + } + } +} + +TEST_P(BasicFunctionality, pushIndex) +{ + uint32_t capacity = GetParam(); + + QueueManager manager(capacity); + + ASSERT_EQ(0u, manager.size()); + + uint32_t generation = 0; + uint32_t index = 0; + + // Fill the queue + for(uint32_t idx = 0; idx < capacity; ++idx) + { + manager.reservePushIndex(generation, index); + manager.commitPushIndex(generation, index); + } + + ASSERT_EQ(capacity, manager.size()); + + for(uint32_t gen = 0; gen < 3; ++gen) + { + for(uint32_t idx = 0; idx < capacity; ++idx) + { + ASSERT_EQ(QueueReturn::QueueFull, + manager.reservePushIndex(generation, index)); + + ASSERT_EQ(QueueReturn::Success, + manager.reservePopIndex(generation, index)); + + ASSERT_EQ(generation, gen); + ASSERT_EQ(index, idx); + ASSERT_EQ(capacity - 1, manager.size()); + + manager.commitPopIndex(generation, index); + ASSERT_EQ(capacity - 1, manager.size()); + + ASSERT_EQ(QueueReturn::Success, + manager.reservePushIndex(generation, index)); + + ASSERT_EQ(generation, gen + 1); + ASSERT_EQ(index, idx); + ASSERT_EQ(manager.size(), capacity); + + manager.commitPushIndex(generation, index); + ASSERT_EQ(manager.size(), capacity); + } + } +} + +INSTANTIATE_TEST_CASE_P(TestQueueManagerBasic, BasicFunctionality, + ::testing::Range(1u, 100u)); + +// Potential issues: +// - That pushing an element at the max combined index will push the next +// element at index 0 +// - That popping an element at the max combined index will pop the next +// element at index 0 +// - That size returns the correct size when the push index has gone past the +// max combined index +// - That reservePopIndexForClear and abortPushIndexReservation clear the +// correct element and increment push/pop + +TEST(TestQueueManagerMaxCombinedIndex, PushAtMax) +{ + QueueManager manager(1); + + QueueIntrospection state{manager}; + + const uint32_t MAX_COMBINED_INDEX = + std::numeric_limits< uint32_t >::max() >> 2; + const uint32_t MAX_GENERATION = std::numeric_limits< uint32_t >::max() >> 2; + + const uint32_t maxGeneration = QueueIntrospection(manager).maxGen(); + const uint32_t maxCombinedIndex = + QueueIntrospection(manager).maxCombinedIndex(); + + ASSERT_EQ(maxGeneration, MAX_GENERATION); + ASSERT_EQ(maxCombinedIndex, MAX_COMBINED_INDEX); + + dirtyGenerate(manager, MAX_COMBINED_INDEX, MAX_COMBINED_INDEX); + + uint32_t gen = 0; + uint32_t index = 0; + + ASSERT_EQ(0u, manager.size()); + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(MAX_GENERATION, gen); + ASSERT_EQ(0u, index); + manager.commitPushIndex(gen, index); + + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + ASSERT_EQ(MAX_GENERATION, gen); + ASSERT_EQ(0u, index); + manager.commitPopIndex(gen, index); + ASSERT_EQ(0u, manager.size()); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(0u, gen); + ASSERT_EQ(0u, index); + manager.commitPushIndex(gen, index); + ASSERT_EQ(1u, manager.size()); + + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + ASSERT_EQ(0u, gen); + ASSERT_EQ(0u, index); + manager.commitPopIndex(gen, index); + ASSERT_EQ(0u, manager.size()); +} + +struct CombinedIndexData +{ + uint32_t capacity; + uint32_t pushIndex; + uint32_t popIndex; +}; + +std::ostream& +operator<<(std::ostream& os, CombinedIndexData d) +{ + os << "[ capacity = " << d.capacity << " pushIndex = " << d.pushIndex + << " popIndex = " << d.popIndex << " ]"; + return os; +} + +class PopAtMax : public ::testing::TestWithParam< CombinedIndexData > +{ +}; + +TEST_P(PopAtMax, PopAtMax) +{ + const auto& d = GetParam(); + + QueueManager manager(d.capacity); + + const uint32_t NUM_GEN = QueueManager::numGenerations(d.capacity); + const uint32_t MAX_GEN = NUM_GEN - 1; + + adjustGeneration(manager, MAX_GEN); + + uint32_t gen = 0; + uint32_t index = 0; + + // Push and pop elements up until the pop-index. + + for(size_t j = 0; j < d.popIndex; ++j) + { + uint32_t INDEX = j % d.capacity; + uint32_t GEN = (MAX_GEN + j / d.capacity) % NUM_GEN; + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(INDEX, index); + ASSERT_EQ(GEN, gen); + manager.commitPushIndex(gen, index); + ASSERT_EQ(1u, manager.size()); + + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + + ASSERT_EQ(INDEX, index); + ASSERT_EQ(GEN, gen); + manager.commitPopIndex(gen, index); + ASSERT_EQ(0u, manager.size()); + } + + // Push elements up to the push index + + for(size_t j = d.popIndex; j < d.pushIndex; ++j) + { + uint32_t INDEX = j % d.capacity; + uint32_t GEN = (MAX_GEN + j / d.capacity) % NUM_GEN; + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_EQ(INDEX, index); + ASSERT_EQ(GEN, gen); + manager.commitPushIndex(gen, index); + ASSERT_EQ(j - d.popIndex + 1, manager.size()); + } + + // Pop elements until the queue is empty. + + for(size_t j = d.popIndex; j < d.pushIndex; ++j) + { + uint32_t INDEX = j % d.capacity; + uint32_t GEN = (MAX_GEN + j / d.capacity) % NUM_GEN; + + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + + ASSERT_EQ(INDEX, index); + ASSERT_EQ(GEN, gen); + manager.commitPopIndex(gen, index); + ASSERT_EQ(d.pushIndex - j - 1, manager.size()); + } +} + +CombinedIndexData PopAtMaxData[] = + { // Capacity 2 queues for a couple generations + {2, 1, 0}, + {2, 2, 0}, + {2, 2, 1}, + {2, 2, 2}, + {2, 3, 1}, + {2, 3, 2}, + {2, 3, 3}, + {2, 4, 2}, + {2, 4, 3}, + {2, 4, 4}, + + // Capacity 3 queues for a couple generations + {3, 2, 0}, + {3, 3, 0}, + {3, 3, 1}, + {3, 3, 2}, + {3, 3, 3}, + {3, 4, 1}, + {3, 4, 2}, + {3, 4, 3}, + {3, 4, 4}, + {3, 5, 2}, + {3, 5, 3}, + {3, 5, 4}, + {3, 5, 5}, + + // Capacity 7 queue + {7, 6, 0}, + {7, 7, 0}, + {7, 7, 6}, + {7, 13, 7}, + {7, 14, 7}}; + +INSTANTIATE_TEST_CASE_P(TestQueueManagerMaxCombinedIndex, PopAtMax, + ::testing::ValuesIn(PopAtMaxData)); + +class ReservePop : public ::testing::TestWithParam< CombinedIndexData > +{ +}; + +TEST_P(ReservePop, ReservePopIndexForClear) +{ + const auto& d = GetParam(); + + QueueManager manager(d.capacity); + const uint32_t NUM_GEN = QueueManager::numGenerations(d.capacity); + const uint32_t MAX_GEN = NUM_GEN - 1; + + adjustGeneration(manager, MAX_GEN); + + generation(manager, d.pushIndex, d.popIndex); + + // Pop elements until the queue is empty + + uint32_t endGeneration = 0; + uint32_t endIndex = 0; + uint32_t gen = 0; + uint32_t index = 0; + + ASSERT_EQ(QueueReturn::Success, + manager.reservePushIndex(endGeneration, endIndex)); + + for(uint32_t j = d.popIndex; j < d.pushIndex; ++j) + { + uint32_t INDEX = j % d.capacity; + uint32_t GEN = (MAX_GEN + j / d.capacity) % NUM_GEN; + + ASSERT_TRUE( + manager.reservePopForClear(gen, index, endGeneration, endIndex)); + + ASSERT_EQ(INDEX, index); + ASSERT_EQ(GEN, gen); + manager.commitPopIndex(gen, index); + } + + ASSERT_FALSE(manager.reservePopForClear(gen, index, endGeneration, endIndex)); + manager.abortPushIndexReservation(endGeneration, endIndex); + ASSERT_EQ(0u, manager.size()); +} + +CombinedIndexData ReservePopIndexForClearData[] = { + // Capacity 2 queues for a couple generations + {2, 1, 0}, + {2, 2, 1}, + {2, 2, 2}, + {2, 3, 2}, + {2, 3, 3}, + {2, 4, 3}, + {2, 4, 4}, + + // Capacity 3 queues for a couple generations + {3, 2, 0}, + {3, 3, 1}, + {3, 3, 2}, + {3, 3, 3}, + {3, 4, 2}, + {3, 4, 3}, + {3, 4, 4}, + {3, 5, 3}, + {3, 5, 4}, + {3, 5, 5}, + + // Capacity 7 queue + {7, 6, 0}, + {7, 7, 6}, + {7, 13, 7}, +}; + +INSTANTIATE_TEST_CASE_P(TestQueueManagerMaxCombinedIndex, ReservePop, + ::testing::ValuesIn(ReservePopIndexForClearData)); + +struct CircularDifferenceData +{ + uint32_t minuend; + uint32_t subtrahend; + uint32_t maxSize; + int32_t expectedValue; +}; + +std::ostream& +operator<<(std::ostream& os, CircularDifferenceData d) +{ + os << "[ minuend = " << d.minuend << " subtrahend = " << d.subtrahend + << " maxSize = " << d.maxSize << " expectedValue = " << d.expectedValue + << " ]"; + return os; +} + +class CircularDifference + : public ::testing::TestWithParam< CircularDifferenceData > +{ +}; + +TEST_P(CircularDifference, difference) +{ + const auto& data = GetParam(); + + ASSERT_EQ(data.expectedValue, + QueueManager::circularDifference(data.minuend, data.subtrahend, + data.maxSize)); +} + +constexpr uint32_t OUR_INT32_MAX = std::numeric_limits< int32_t >::max(); +constexpr uint32_t OUR_INT32_MAX_1 = OUR_INT32_MAX + 1; +constexpr int32_t OUR_INT32_MAX_DIV = OUR_INT32_MAX_1 / 2; + +CircularDifferenceData circularDifferenceData[] = { + // capacity 1 + {0, 0, 1, 0}, + + // capacity 2 + {1, 1, 2, 0}, + {1, 0, 2, 1}, + {0, 1, 2, -1}, + + // capacity 3 + {2, 0, 3, -1}, + {2, 1, 3, 1}, + {2, 2, 3, 0}, + {1, 0, 3, 1}, + {1, 1, 3, 0}, + {1, 2, 3, -1}, + {0, 0, 3, 0}, + {0, 1, 3, -1}, + {0, 2, 3, 1}, + + // capacity 4 + {3, 0, 4, -1}, + {3, 1, 4, 2}, + {3, 2, 4, 1}, + {3, 3, 4, 0}, + {0, 3, 4, 1}, + {1, 3, 4, -2}, + {2, 3, 4, -1}, + {3, 3, 4, 0}, + + // capacity INT_MAX + {OUR_INT32_MAX, 0, OUR_INT32_MAX_1, -1}, + {0, OUR_INT32_MAX, OUR_INT32_MAX_1, 1}, + {OUR_INT32_MAX_DIV, 0, OUR_INT32_MAX_1, OUR_INT32_MAX_DIV}, + {0, OUR_INT32_MAX_DIV, OUR_INT32_MAX_1, -OUR_INT32_MAX_DIV}, + + // Examples circularDifference( 0, 359, 360) == 1 + // circularDifference( 359, 0, 360) == -1 circularDifference( + // 180, 0, 360) == 180 circularDifference( 0, 180, 360) == -180 + + {0, 359, 360, 1}, + {359, 0, 360, -1}, + {180, 0, 360, 180}, + {0, 180, 360, -180}, +}; + +INSTANTIATE_TEST_CASE_P(TestQueueManagerMaxCombinedIndex, CircularDifference, + ::testing::ValuesIn(circularDifferenceData)); + +class NumGenerations : public ::testing::TestWithParam< uint32_t > +{ +}; + +TEST_P(NumGenerations, generations) +{ + uint32_t capacity = GetParam(); + uint32_t numGen = QueueManager::numGenerations(capacity); + + static const uint32_t MAX_ELEMENT_STATE_GEN = + std::numeric_limits< uint32_t >::max() >> 2; + + static const uint32_t MAX_COMBINED_INDEX = + std::numeric_limits< uint32_t >::max() >> 1; + + ASSERT_GE(numGen, 2u); + ASSERT_TRUE(MAX_ELEMENT_STATE_GEN == numGen - 1 + || ((numGen * capacity - 1 <= MAX_COMBINED_INDEX) + && ((numGen + 1) * capacity - 1 > MAX_COMBINED_INDEX))); +} + +uint32_t GenerationData[] = {1, + 2, + 3, + 4, + 15, + 16, + 17, + QueueManager::MAX_CAPACITY - 1, + QueueManager::MAX_CAPACITY}; + +INSTANTIATE_TEST_CASE_P(TestQueueManagerMaxCombinedIndex, NumGenerations, + ::testing::ValuesIn(GenerationData)); + +TEST(TestQueueManager, abortPushIndexReservation) +{ + uint32_t genA = 0; + uint32_t genB = 0; + uint32_t indexA = 0; + uint32_t indexB = 0; + + QueueManager manager(1); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(genA, indexA)); + ASSERT_NE(QueueReturn::Success, manager.reservePushIndex(genA, indexA)); + + manager.abortPushIndexReservation(genA, indexA); + + ASSERT_EQ(0u, manager.size()); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(genB, indexB)); + ASSERT_EQ(genA + 1, genB); + ASSERT_EQ(indexA, indexB); +} + +struct AbortData +{ + uint32_t capacity; + uint32_t pushIndex; + uint32_t popIndex; + uint32_t expectedClears; +}; + +std::ostream& +operator<<(std::ostream& os, AbortData d) +{ + os << "[ capacity = " << d.capacity << " pushIndex = " << d.pushIndex + << " popIndex = " << d.popIndex << " expectedClears = " << d.expectedClears + << " ]"; + return os; +} + +class AbortPush : public ::testing::TestWithParam< AbortData > +{ +}; + +TEST_P(AbortPush, abortPush) +{ + const auto& data = GetParam(); + + QueueManager manager(data.capacity); + + generation(manager, data.pushIndex, data.popIndex); + + const uint32_t END_GENERATION = data.pushIndex / data.capacity; + const uint32_t END_INDEX = data.pushIndex % data.capacity; + + uint32_t gen = 0; + uint32_t index = 0; + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(END_GENERATION, gen); + ASSERT_EQ(END_INDEX, index); + + for(uint32_t i = 0; i < data.expectedClears; ++i) + { + ASSERT_TRUE( + manager.reservePopForClear(gen, index, END_GENERATION, END_INDEX)); + + ASSERT_EQ((data.popIndex + i) / data.capacity, gen); + ASSERT_EQ((data.popIndex + i) % data.capacity, index); + + manager.commitPopIndex(gen, index); + } + + ASSERT_FALSE( + manager.reservePopForClear(gen, index, END_GENERATION, END_INDEX)); + + manager.abortPushIndexReservation(END_GENERATION, END_INDEX); + + // Verify the queue is now empty, and the current push index has changed + + ASSERT_EQ(0u, manager.size()); + for(uint32_t i = 0; i < data.capacity; ++i) + { + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(i + 1, manager.size()); + + ASSERT_EQ(END_GENERATION * data.capacity + END_INDEX + i + 1, + gen * data.capacity + index); + } + + ASSERT_NE(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(data.capacity, manager.size()); +} + +AbortData abortData[] = { + {1, 0, 0, 0}, + + // Capacity 2 queues for a couple generations + {2, 0, 0, 0}, + {2, 1, 0, 1}, + {2, 1, 1, 0}, + {2, 2, 1, 1}, + {2, 2, 2, 0}, + {2, 3, 2, 1}, + {2, 3, 3, 0}, + + // Capacity 3 queues for a couple generations + {3, 0, 0, 0}, + {3, 1, 0, 1}, + {3, 1, 1, 0}, + {3, 2, 0, 2}, + {3, 2, 1, 1}, + {3, 2, 2, 0}, + {3, 3, 1, 2}, + {3, 3, 2, 1}, + {3, 3, 3, 0}, + {3, 4, 2, 2}, + {3, 4, 3, 1}, + {3, 4, 4, 0}, + + // Capacity 7 queue + {7, 14, 14, 0}, + {7, 15, 14, 1}, + {7, 20, 14, 6}, + {7, 18, 18, 0}, + {7, 19, 18, 1}, + {7, 24, 18, 6}, +}; + +INSTANTIATE_TEST_CASE_P(TestQueueManagerMaxCombinedIndex, AbortPush, + ::testing::ValuesIn(abortData)); + +// Testing reservePopForClear +// - Failure is returned when the head of the queue is the same as the given end +// generation and index +// - Success is returned and clears the queue head when the current pop index is +// not the given end generation and index +// - We do not clear an index reserved for popping + +TEST(TestQueueManagerReserve, Capacity1) +{ + // It is not possible to clear a pop index when the capacity is 1. + + uint32_t gen = 0; + uint32_t index = 0; + + // Random values to verify we didn't change them. + uint32_t resultGen = 1024; + uint32_t resultIndex = 1023; + + QueueManager manager(1); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_FALSE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + + ASSERT_EQ(1024u, resultGen); + ASSERT_EQ(1023u, resultIndex); + + ASSERT_EQ(1u, manager.size()); +} + +TEST(TestQueueManagerReserve, Capacity2) +{ + uint32_t gen = 0; + uint32_t index = 0; + + // Random values to verify we didn't change them. + uint32_t resultGen = 1024; + uint32_t resultIndex = 1023; + + QueueManager manager(2); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_FALSE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + + ASSERT_EQ(1024u, resultGen); + ASSERT_EQ(1023u, resultIndex); + manager.commitPushIndex(gen, index); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_TRUE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + ASSERT_EQ(0u, resultGen); + ASSERT_EQ(0u, resultIndex); + manager.commitPopIndex(resultGen, resultIndex); + + manager.commitPushIndex(gen, index); + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_TRUE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + ASSERT_EQ(0u, resultGen); + ASSERT_EQ(1u, resultIndex); + manager.commitPopIndex(resultGen, resultIndex); + manager.commitPushIndex(gen, index); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_TRUE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + ASSERT_EQ(1u, resultGen); + ASSERT_EQ(0u, resultIndex); + manager.commitPopIndex(resultGen, resultIndex); + manager.commitPushIndex(gen, index); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_TRUE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + ASSERT_EQ(1u, resultGen); + ASSERT_EQ(1u, resultIndex); + manager.commitPopIndex(resultGen, resultIndex); + manager.commitPushIndex(gen, index); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + + ASSERT_TRUE(manager.reservePopForClear(resultGen, resultIndex, gen, index)); + ASSERT_EQ(2u, resultGen); + ASSERT_EQ(0u, resultIndex); + manager.commitPopIndex(resultGen, resultIndex); + manager.commitPushIndex(gen, index); +} + +struct ReserveData +{ + uint32_t capacity; + uint32_t pushIndex; + uint32_t popIndex; + uint32_t expectedClears; +}; + +std::ostream& +operator<<(std::ostream& os, ReserveData d) +{ + os << "[ capacity = " << d.capacity << " pushIndex = " << d.pushIndex + << " popIndex = " << d.popIndex << " expectedClears = " << d.expectedClears + << " ]"; + return os; +} +class Reserve : public ::testing::TestWithParam< ReserveData > +{ +}; + +TEST_P(Reserve, clear) +{ + const auto& data = GetParam(); + QueueManager manager(data.capacity); + + generation(manager, data.pushIndex, data.popIndex); + + const uint32_t endGen = data.pushIndex / data.capacity; + const uint32_t endIdx = data.pushIndex % data.capacity; + + uint32_t gen = 0; + uint32_t index = 0; + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + ASSERT_EQ(endGen, gen); + ASSERT_EQ(endIdx, index); + + for(unsigned int j = 0; j < data.expectedClears; ++j) + { + ASSERT_TRUE(manager.reservePopForClear(gen, index, endGen, endIdx)); + ASSERT_EQ((data.popIndex + j) / data.capacity, gen); + ASSERT_EQ((data.popIndex + j) % data.capacity, index); + manager.commitPopIndex(gen, index); + } + ASSERT_FALSE(manager.reservePopForClear(gen, index, endGen, endIdx)); + manager.commitPushIndex(endGen, endIdx); + ASSERT_EQ(1u, manager.size()); +} + +ReserveData reserveData[] = { + {1, 0, 0, 0}, + + // Capacity 2 queues for a couple generations + {2, 0, 0, 0}, + {2, 1, 0, 1}, + {2, 1, 1, 0}, + {2, 2, 1, 1}, + {2, 2, 2, 0}, + {2, 3, 2, 1}, + {2, 3, 3, 0}, + + // Capacity 3 queues for a couple generations + {3, 0, 0, 0}, + {3, 1, 0, 1}, + {3, 1, 1, 0}, + {3, 2, 0, 2}, + {3, 2, 1, 1}, + {3, 2, 2, 0}, + {3, 3, 1, 2}, + {3, 3, 2, 1}, + {3, 3, 3, 0}, + {3, 4, 2, 2}, + {3, 4, 3, 1}, + {3, 4, 4, 0}, + + // Capacity 7 queue + {7, 14, 14, 0}, + {7, 15, 14, 1}, + {7, 20, 14, 6}, + {7, 18, 18, 0}, + {7, 19, 18, 1}, + {7, 24, 18, 6}, +}; + +INSTANTIATE_TEST_CASE_P(TestQueueManagerReserve, Reserve, + ::testing::ValuesIn(reserveData)); + +TEST(TestQueueManager, Enabled) +{ + QueueManager manager(3); + + ASSERT_TRUE(manager.enabled()); + + uint32_t gen = 0; + uint32_t index = 0; + + // Insert 2 elements. + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + manager.commitPushIndex(gen, index); + ASSERT_EQ(1u, manager.size()); + + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + manager.commitPushIndex(gen, index); + ASSERT_EQ(2u, manager.size()); + + // Disable the queue. + manager.disable(); + ASSERT_FALSE(manager.enabled()); + + // Test that attempting to push fails. + ASSERT_EQ(QueueReturn::QueueDisabled, manager.reservePushIndex(gen, index)); + ASSERT_EQ(2u, manager.size()); + + // Test that attempting to pop succeeds. + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + manager.commitPopIndex(gen, index); + ASSERT_EQ(1u, manager.size()); + + // Test that attempting to push still fails. + ASSERT_EQ(QueueReturn::QueueDisabled, manager.reservePushIndex(gen, index)); + ASSERT_EQ(1u, manager.size()); + + // Disable the queue a second time, and verify that has no effect. + manager.disable(); + ASSERT_FALSE(manager.enabled()); + + // Test that attempting to push still fails. + ASSERT_EQ(QueueReturn::QueueDisabled, manager.reservePushIndex(gen, index)); + ASSERT_EQ(1u, manager.size()); + + // Enable the queue. + manager.enable(); + ASSERT_TRUE(manager.enabled()); + + // Test that attempting to push succeeds. + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + manager.commitPushIndex(gen, index); + ASSERT_EQ(2u, manager.size()); + + // Test that attempting to pop succeeds. + ASSERT_EQ(QueueReturn::Success, manager.reservePopIndex(gen, index)); + manager.commitPopIndex(gen, index); + ASSERT_EQ(1u, manager.size()); + + // Enable the queue a second time, and verify that has no effect. + manager.enable(); + ASSERT_TRUE(manager.enabled()); + + // Test that attempting to push succeeds. + ASSERT_EQ(QueueReturn::Success, manager.reservePushIndex(gen, index)); + manager.commitPushIndex(gen, index); + ASSERT_EQ(2u, manager.size()); +}